Skip to content

Commit 89a0f24

Browse files
committed
optimize CRDT formulation
1 parent 871a69d commit 89a0f24

File tree

1 file changed

+146
-29
lines changed

1 file changed

+146
-29
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,32 @@
11
package com.reactivedesignpatterns.chapter12
22

3-
import akka.cluster.ddata.ReplicatedData
3+
import akka.actor._
4+
import akka.cluster.ddata._
5+
import scala.concurrent.duration._
6+
import akka.cluster.Cluster
7+
import com.typesafe.config.ConfigFactory
48

59
object MultiMasterCRDT {
10+
11+
private var statusMap = Map.empty[String, Status]
612

7-
sealed trait Status extends ReplicatedData {
13+
final case class Status(val name: String)(_pred: => Set[Status], _succ: => Set[Status]) extends ReplicatedData {
814
type T = Status
915
def merge(that: Status): Status = mergeStatus(this, that)
16+
17+
lazy val predecessors = _pred
18+
lazy val successors = _succ
1019

11-
def predecessors: Set[Status]
12-
def successors: Set[Status]
20+
if (!statusMap.contains(name)) statusMap += name -> this
21+
private def readResolve: AnyRef = statusMap(name)
1322
}
1423

15-
case object New extends Status {
16-
lazy val predecessors: Set[Status] = Set.empty
17-
lazy val successors: Set[Status] = Set(Scheduled)
18-
}
19-
case object Scheduled extends Status {
20-
lazy val predecessors: Set[Status] = Set(New)
21-
lazy val successors: Set[Status] = Set(Executing, Cancelled)
22-
}
23-
case object Executing extends Status {
24-
lazy val predecessors: Set[Status] = Set(Scheduled)
25-
lazy val successors: Set[Status] = Set(Aborted)
26-
}
27-
case object Finished extends Status {
28-
lazy val predecessors: Set[Status] = Set(Executing, Aborted)
29-
lazy val successors: Set[Status] = Set.empty
30-
}
31-
case object Cancelled extends Status {
32-
lazy val predecessors: Set[Status] = Set(New, Scheduled)
33-
lazy val successors: Set[Status] = Set(Aborted)
34-
}
35-
case object Aborted extends Status {
36-
lazy val predecessors: Set[Status] = Set(Cancelled, Executing)
37-
lazy val successors: Set[Status] = Set(Finished)
38-
}
24+
val New: Status = Status("new")(Set.empty, Set(Scheduled, Cancelled))
25+
val Scheduled: Status = Status("scheduled")(Set(New), Set(Executing, Cancelled))
26+
val Executing: Status = Status("executing")(Set(Scheduled), Set(Aborted, Finished))
27+
val Finished: Status = Status("finished")(Set(Executing, Aborted), Set.empty)
28+
val Cancelled: Status = Status("cancelled")(Set(New, Scheduled), Set(Aborted))
29+
val Aborted: Status = Status("aborted")(Set(Cancelled, Executing), Set(Finished))
3930

4031
def mergeStatus(left: Status, right: Status): Status = {
4132
/*
@@ -61,5 +52,131 @@ object MultiMasterCRDT {
6152

6253
innerLoop(right, Set.empty)
6354
}
55+
56+
object StorageComponent extends Key[ORMap[Status]]("StorageComponent")
57+
58+
case class Submit(job: String)
59+
case class Cancel(job: String)
60+
case class Execute(job: String)
61+
case class Finish(job: String)
62+
case object PrintStatus
63+
64+
class ClientInterface extends Actor with ActorLogging {
65+
val replicator = DistributedData(context.system).replicator
66+
implicit val cluster = Cluster(context.system)
67+
68+
def receive = {
69+
case Submit(job) =>
70+
log.info("submitting job {}", job)
71+
replicator ! Replicator.Update(StorageComponent, ORMap.empty[Status], Replicator.WriteMajority(5.seconds), Some(s"submit $job"))(_ + (job -> New))
72+
case Cancel(job) =>
73+
log.info("cancelling job {}", job)
74+
replicator ! Replicator.Update(StorageComponent, ORMap.empty[Status], Replicator.WriteMajority(5.seconds), Some(s"cancel $job"))(_ + (job -> Cancelled))
75+
case r: Replicator.UpdateResponse[_] =>
76+
log.info("received update result: {}", r)
77+
case PrintStatus =>
78+
replicator ! Replicator.Get(StorageComponent, Replicator.ReadMajority(5.seconds))
79+
case g: Replicator.GetSuccess[_] =>
80+
log.info("overall status: {}", g.get(StorageComponent))
81+
}
82+
}
83+
84+
class Executor extends Actor with ActorLogging {
85+
val replicator = DistributedData(context.system).replicator
86+
implicit val cluster = Cluster(context.system)
87+
88+
var lastState = Map.empty[String, Status]
89+
90+
replicator ! Replicator.Subscribe(StorageComponent, self)
91+
92+
def receive = {
93+
case Execute(job) =>
94+
log.info("executing job {}", job)
95+
replicator ! Replicator.Update(StorageComponent, ORMap.empty[Status], Replicator.WriteMajority(5.seconds), Some(s"submit $job")) { map =>
96+
if (map.get(job) == Some(New)) map + (job -> Executing)
97+
else map
98+
}
99+
case Finish(job) =>
100+
log.info("job {} finished", job)
101+
replicator ! Replicator.Update(StorageComponent, ORMap.empty[Status], Replicator.WriteMajority(5.seconds), Some(s"cancel $job"))(_ + (job -> Finished))
102+
case r: Replicator.UpdateResponse[_] =>
103+
log.info("received update result: {}", r)
104+
case ch: Replicator.Changed[_] =>
105+
val current = ch.get(StorageComponent).entries
106+
for {
107+
(job, status) <- current.iterator
108+
if (status == Aborted)
109+
if (lastState.get(job) != Some(Aborted))
110+
} log.info("aborting job {}", job)
111+
lastState = current
112+
}
113+
}
114+
115+
val commonConfig = ConfigFactory.parseString("""
116+
akka.actor.provider = akka.cluster.ClusterActorRefProvider
117+
akka.remote.netty.tcp {
118+
host = "127.0.0.1"
119+
port = 0
120+
}
121+
akka.cluster {
122+
gossip-interval = 100ms
123+
failure-detector {
124+
heartbeat-interval = 100ms
125+
acceptable-heartbeat-pause = 500ms
126+
}
127+
distributed-data.gossip-interval = 100ms
128+
}
129+
""")
64130

65-
}
131+
object sleep
132+
implicit object waitConvert extends DurationConversions.Classifier[sleep.type] {
133+
type R = Unit
134+
def convert(d: FiniteDuration): Unit = Thread.sleep(d.toMillis)
135+
}
136+
137+
def main(args: Array[String]): Unit = {
138+
val sys1 = ActorSystem("MultiMasterCRDT", commonConfig)
139+
val addr1 = Cluster(sys1).selfAddress
140+
Cluster(sys1).join(addr1)
141+
142+
val sys2 = ActorSystem("MultiMasterCRDT", commonConfig)
143+
Cluster(sys2).join(addr1)
144+
145+
awaitMembers(sys1, 2)
146+
147+
val clientInterface = sys1.actorOf(Props(new ClientInterface), "clientInterface")
148+
val executor = sys2.actorOf(Props(new Executor), "executor")
149+
150+
clientInterface ! Submit("alpha")
151+
clientInterface ! Submit("beta")
152+
clientInterface ! Submit("gamma")
153+
clientInterface ! Submit("delta")
154+
1 second sleep
155+
executor ! Execute("alpha")
156+
executor ! Execute("gamma")
157+
clientInterface ! Cancel("delta")
158+
1 second sleep
159+
clientInterface ! Cancel("alpha")
160+
clientInterface ! Cancel("beta")
161+
executor ! Execute("beta")
162+
1 second sleep
163+
clientInterface ! Cancel("gamma")
164+
1 second sleep
165+
executor ! Finish("gamma")
166+
3 seconds sleep
167+
clientInterface ! PrintStatus
168+
1 second sleep
169+
170+
sys1.terminate()
171+
sys2.terminate()
172+
}
173+
174+
private def awaitMembers(sys: ActorSystem, count: Int): Unit = {
175+
while (Cluster(sys).state.members.size < count) {
176+
Thread.sleep(500)
177+
print('.')
178+
Console.flush()
179+
}
180+
println("cluster started")
181+
}
182+
}

0 commit comments

Comments
 (0)