@@ -17,6 +17,7 @@ import scala.annotation.tailrec
17
17
import akka .contrib .pattern .ClusterSingletonManager
18
18
import akka .contrib .pattern .ClusterSingletonProxy
19
19
import com .typesafe .config .ConfigFactory
20
+ import scala .io .StdIn
20
21
21
22
object ActivePassive {
22
23
import ReplicationProtocol ._
@@ -37,6 +38,8 @@ object ActivePassive {
37
38
private val toReplicate = Queue .empty[Replicate ]
38
39
private var replicating = TreeMap .empty[Int , (Replicate , Int )]
39
40
41
+ private var rejected = 0
42
+
40
43
val cluster = Cluster (context.system)
41
44
42
45
import context .dispatcher
@@ -45,9 +48,10 @@ object ActivePassive {
45
48
46
49
log.info(" taking over from local replica" )
47
50
localReplica ! TakeOver (self)
48
-
51
+
49
52
def receive = {
50
53
case InitialState (m, s) =>
54
+ log.info(" took over at sequence {}" , s)
51
55
theStore = m
52
56
seqNr = Iterator from s
53
57
context.become(running)
@@ -56,11 +60,12 @@ object ActivePassive {
56
60
}
57
61
58
62
val running : Receive = {
59
- case Put (key, value, replyTo) =>
63
+ case p @ Put (key, value, replyTo) =>
60
64
if (toReplicate.size < MaxOutstanding ) {
61
65
toReplicate.enqueue(Replicate (seqNr.next, key, value, replyTo))
62
66
replicate()
63
67
} else {
68
+ rejected += 1
64
69
replyTo ! PutRejected (key, value)
65
70
}
66
71
case Get (key, replyTo) =>
@@ -69,13 +74,17 @@ object ActivePassive {
69
74
replicating.valuesIterator foreach {
70
75
case (replicate, count) => disseminate(replicate)
71
76
}
77
+ if (rejected > 0 ) {
78
+ log.info(" rejected {} PUT requests" , rejected)
79
+ rejected = 0
80
+ }
72
81
case Replicated (confirm) =>
73
82
replicating.get(confirm) match {
74
83
case None => // already removed
75
84
case Some ((rep, 1 )) =>
76
85
replicating -= confirm
77
86
theStore += rep.key -> rep.value
78
- rep.replyTo ! PutConfirmed (rep.key)
87
+ rep.replyTo ! PutConfirmed (rep.key, rep.value )
79
88
case Some ((rep, n)) =>
80
89
replicating += confirm -> (rep, n - 1 )
81
90
}
@@ -106,7 +115,7 @@ object ActivePassive {
106
115
private val applied = Queue .empty[Replicate ]
107
116
private var waiting = TreeMap .empty[Int , Replicate ]
108
117
109
- val name = self.path.address .toString
118
+ val name = Cluster (context.system).selfAddress .toString.replaceAll( " [:/] " , " _ " )
110
119
val cluster = Cluster (context.system)
111
120
val random = new Random
112
121
@@ -116,26 +125,30 @@ object ActivePassive {
116
125
expectedSeq = s + 1
117
126
}
118
127
log.info(" started at sequence {}" , expectedSeq)
119
-
128
+
120
129
override def postStop (): Unit = {
121
130
log.info(" stopped at sequence {}" , expectedSeq)
122
131
}
123
132
124
133
def receive = {
125
134
case TakeOver (active) =>
135
+ log.info(" active replica starting at sequence {}" , expectedSeq)
126
136
active ! InitialState (theStore, expectedSeq)
127
137
case Replicate (s, _, _, replyTo) if s < expectedSeq =>
128
138
replyTo ! Replicated (s)
129
139
case r : Replicate =>
130
140
waiting += r.seq -> r
131
141
consolidate()
132
142
case GetSingle (s, replyTo) =>
143
+ log.info(" GetSingle from {}" , replyTo)
133
144
if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
134
145
replyTo ! applied.find(_.seq == s).get
135
146
else if (s < expectedSeq) replyTo ! InitialState (theStore, expectedSeq)
136
147
case GetFull (replyTo) =>
148
+ log.info(" sending full info to {}" , replyTo)
137
149
replyTo ! InitialState (theStore, expectedSeq)
138
150
case InitialState (m, s) if s > expectedSeq =>
151
+ log.info(" received newer state at sequence {} (was at {})" , s, expectedSeq)
139
152
theStore = m
140
153
expectedSeq = s + 1
141
154
persist(name, s, m)
@@ -163,12 +176,12 @@ object ActivePassive {
163
176
}
164
177
waiting = waiting.drop(prefix)
165
178
}
166
-
179
+
167
180
// cap the size of the applied buffer
168
181
applied.drop(Math .max(0 , applied.size - 10 ))
169
182
170
183
// check if we fell behind by too much
171
- if (waiting.lastKey - expectedSeq > 10 ) {
184
+ if (waiting.nonEmpty && (waiting. lastKey - expectedSeq > 10 ) ) {
172
185
val outstanding = (expectedSeq to waiting.lastKey).iterator.filterNot(waiting.contains).toList
173
186
if (outstanding.size <= 3 ) outstanding foreach askAround
174
187
else askAroundFullState()
@@ -180,17 +193,23 @@ object ActivePassive {
180
193
random.shuffle(cluster.state.members.iterator.map(_.address).toSeq).take(n)
181
194
}
182
195
private def askAround (seq : Int ): Unit = {
196
+ log.info(" asking around for sequence number {}" , seq)
183
197
getMembers(askAroundCount).foreach(addr => replicaOn(addr) ! GetSingle (seq, self))
184
198
}
185
199
private def askAroundFullState (): Unit = {
200
+ log.info(" asking for full data" )
186
201
getMembers(1 ).foreach(addr => replicaOn(addr) ! GetFull (self))
187
202
}
188
203
private def replicaOn (addr : Address ): ActorSelection =
189
204
context.actorSelection(self.path.toStringWithAddress(addr))
190
205
}
191
-
206
+
192
207
val commonConfig = ConfigFactory .parseString("""
193
208
akka.actor.provider = akka.cluster.ClusterActorRefProvider
209
+ akka.remote.netty.tcp {
210
+ host = "127.0.0.1"
211
+ port = 0
212
+ }
194
213
akka.cluster {
195
214
gossip-interval = 100ms
196
215
failure-detector {
@@ -199,51 +218,126 @@ object ActivePassive {
199
218
}
200
219
}
201
220
""" )
202
- def roleConfig (name : String ) = ConfigFactory .parseString(s """ akka.cluster.roles = [" $name"] """ )
221
+ def roleConfig (name : String , port : Option [Int ]) = {
222
+ val roles = ConfigFactory .parseString(s """ akka.cluster.roles = [" $name"] """ )
223
+ port match {
224
+ case None => roles
225
+ case Some (p) =>
226
+ ConfigFactory .parseString(s """ akka.remote.netty.tcp.port = $p""" )
227
+ .withFallback(roles)
228
+ }
229
+ }
203
230
204
- def start (n : Int ): ActorSystem = {
205
- val system = ActorSystem (s " node $n " , roleConfig(" backend" ) withFallback commonConfig)
231
+ def start (port : Option [ Int ] ): ActorSystem = {
232
+ val system = ActorSystem (" ActivePassive " , roleConfig(" backend" , port ) withFallback commonConfig)
206
233
val localReplica = system.actorOf(Props (new Passive (3 , 3 .seconds)), " passive" )
207
234
val managerProps =
208
- ClusterSingletonManager .props(Props (new Active (localReplica, 2 , 10 )), " active" , PoisonPill , Some (" backend" ))
235
+ ClusterSingletonManager .props(Props (new Active (localReplica, 2 , 120 )), " active" , PoisonPill ,
236
+ role = Some (" backend" ), retryInterval = 150 .millis)
209
237
val manager = system.actorOf(managerProps, " activeManager" )
210
238
system
211
239
}
212
240
213
241
def main (args : Array [String ]): Unit = {
214
- val systems = Array .tabulate (5 )(start(_ ))
242
+ val systems = Array .fill (5 )(start(None ))
215
243
val seedNode = Cluster (systems(0 )).selfAddress
216
244
systems foreach (Cluster (_).join(seedNode))
217
245
218
- val sys = ActorSystem (" ActivePassive" , commonConfig)
246
+ val sys = ActorSystem (" ActivePassive" , ConfigFactory .parseString( " akka.loglevel=INFO " ) withFallback commonConfig)
219
247
Cluster (sys).join(seedNode)
248
+
249
+ awaitMembers(sys, systems.length + 1 )
250
+
251
+ val proxy = sys.actorOf(ClusterSingletonProxy .props(" /user/activeManager/active" , Some (" backend" )), " proxy" )
252
+
253
+ val useStorage = sys.actorOf(Props (new UseStorage (proxy)), " useStorage" )
254
+ useStorage ! Run (0 )
255
+
256
+ sys.actorOf(Props (new Actor {
257
+ def receive = {
258
+ case Run =>
259
+ StdIn .readLine()
260
+ useStorage ! Stop
261
+ }
262
+ })) ! Run
263
+
264
+ Thread .sleep(10000 )
220
265
221
- while (Cluster (sys).state.members.size < 6 ) {
266
+ val rnd = new Random
267
+ while (! terminate) {
268
+ Thread .sleep(5000 )
269
+ val sysidx = rnd.nextInt(systems.length)
270
+ val oldsys = systems(sysidx)
271
+ val port = Cluster (oldsys).selfAddress.port
272
+ oldsys.shutdown()
273
+ oldsys.awaitTermination()
274
+ val newsys = start(port)
275
+ val seed = Cluster (if (sysidx == 0 ) systems(1 ) else systems(0 )).selfAddress
276
+ Cluster (newsys).join(seed)
277
+ systems(sysidx) = newsys
278
+ awaitMembers(sys, systems.length + 1 )
279
+ }
280
+
281
+ Thread .sleep(3000 )
282
+
283
+ sys.shutdown()
284
+ systems foreach (_.shutdown())
285
+ }
286
+
287
+ private def awaitMembers (sys : ActorSystem , count : Int ): Unit = {
288
+ while (Cluster (sys).state.members.size < count) {
222
289
Thread .sleep(500 )
223
290
print('.' )
224
291
Console .flush()
225
292
}
226
293
println(" cluster started" )
227
-
228
- val path = sys / " user" / " activeManager" / " active"
229
- val proxy = sys.actorOf(ClusterSingletonProxy .props(path.toString, Some (" backend" )))
230
-
231
- sys.actorOf(Props (new UseStorage (proxy)), " useStorage" ) ! Run
232
-
233
- while (! terminate) {
234
-
235
- }
236
294
}
237
-
238
- private case object Run
295
+
296
+ private case class Run ( round : Int )
239
297
private case object Stop
240
298
@ volatile private var terminate = false
241
-
242
- private class UseStorage (db : ActorRef ) extends Actor {
299
+
300
+ private class UseStorage (db : ActorRef ) extends Actor with ActorLogging {
301
+ val N = 200
302
+ var theStore = Map .empty[String , JsValue ]
303
+ val keys = (1 to N ).map(i => f " $i%03d " )
304
+ var outstanding = Set .empty[String ]
305
+ val rnd = new Random
306
+ var lastOutstandingCount = 0
307
+
243
308
def receive = {
244
- case Run =>
245
-
246
- self ! Run
309
+ case Run (0 ) =>
310
+ db ! Get (" initial" , self)
311
+ case GetResult (" initial" , _) =>
312
+ self ! Run (1 )
313
+ case Run (round) =>
314
+ if (round % 100 == 0 ) log.info(" round {}" , round)
315
+ val nowOutstanding = outstanding.size
316
+ if (nowOutstanding != lastOutstandingCount) {
317
+ lastOutstandingCount = nowOutstanding
318
+ log.info(" {} outstanding" , nowOutstanding)
319
+ }
320
+ for (k <- keys) {
321
+ db ! Get (k, self)
322
+ if (! outstanding.contains(k) && rnd.nextBoolean()) {
323
+ db ! Put (k, JsNumber (round), self)
324
+ outstanding += k
325
+ }
326
+ }
327
+ context.system.scheduler.scheduleOnce(100 .millis, self, Run (round + 1 ))(context.dispatcher)
328
+ case GetResult (key, value) =>
329
+ if (outstanding.contains(key)) {
330
+ outstanding -= key
331
+ value foreach (theStore += key -> _)
332
+ } else if (value != theStore.get(key)) {
333
+ log.warning(" returned wrong value for key {}: {} (expected {})" , key, value, theStore.get(key))
334
+ context.stop(self)
335
+ }
336
+ case PutConfirmed (key, value) =>
337
+ outstanding -= key
338
+ theStore += key -> value
339
+ case PutRejected (key, value) =>
340
+ outstanding -= key
247
341
case Stop => context.stop(self)
248
342
}
249
343
override def postStop (): Unit = terminate = true
0 commit comments