@@ -7,6 +7,7 @@ import akka.actor._
7
7
import play .api .libs .json .JsValue
8
8
import scala .collection .immutable .TreeMap
9
9
import scala .concurrent .duration ._
10
+ import scala .annotation .tailrec
10
11
11
12
object ActiveActive {
12
13
import ReplicationProtocol ._
@@ -50,7 +51,6 @@ object ActiveActive {
50
51
def deadline : Deadline
51
52
def missing : Set [ActorRef ]
52
53
def add (res : SeqResult ): ReplyState
53
- def isKnown : Boolean
54
54
def isFinished : Boolean = missing.isEmpty
55
55
}
56
56
private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ]) extends ReplyState {
@@ -66,88 +66,94 @@ object ActiveActive {
66
66
} else Unknown (deadline, nextReplies, missing - res.replica)
67
67
} else Unknown (deadline, nextReplies, missing - res.replica)
68
68
}
69
- override def isKnown = false
70
69
}
71
70
private case class Known (deadline : Deadline , reply : SeqResult , wrong : Set [ActorRef ], missing : Set [ActorRef ]) extends ReplyState {
72
71
override def add (res : SeqResult ): ReplyState = {
73
72
val nextWrong = if (res.res == reply.res) wrong else wrong + res.replica
74
73
Known (deadline, reply, nextWrong, missing - res.replica)
75
74
}
76
- override def isKnown = true
77
75
}
78
76
79
77
class Coordinator (N : Int ) extends Actor {
80
- private var replicas = (1 to N ).map(_ => context.actorOf( Replica .props )).toSet
78
+ private var replicas = (1 to N ).map(_ => newReplica( )).toSet
81
79
private val seqNr = Iterator from 0
82
80
private var replies = TreeMap .empty[Int , ReplyState ]
83
81
private var nextReply = 0
84
82
83
+ override def supervisorStrategy = SupervisorStrategy .stoppingStrategy
84
+
85
+ private def newReplica (): ActorRef =
86
+ context.watch(context.actorOf(Replica .props))
87
+
88
+ // schedule timeout messages for quiescent periods
85
89
context.setReceiveTimeout(1 .second)
86
90
87
- def receive = {
91
+ def receive = ( {
88
92
case cmd : Command =>
89
93
val c = SeqCommand (seqNr.next, cmd, self)
90
94
replicas foreach (_ ! c)
91
95
replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas)
92
- doTimeouts()
93
96
case res : SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
94
97
val prevState = replies(res.seq)
95
98
val nextState = prevState.add(res)
96
- // potentially send reply if quorum of replies has been received now
97
- nextState match {
98
- case Known (seq, reply, _, _) if ! prevState.isKnown && seq == nextReply =>
99
- reply.replyTo ! reply.res
100
- nextReply += 1
101
- case _ =>
102
- }
103
- // clean up state
104
- if (nextState.isFinished) {
105
- dispose(nextState)
106
- replies -= res.seq
107
- } else {
108
- replies += res.seq -> nextState
109
- doTimeouts()
110
- }
111
- case ReceiveTimeout => doTimeouts()
99
+ replies += res.seq -> nextState
100
+ case Terminated (ref) =>
101
+ replaceReplica(ref, terminate = false )
102
+ case ReceiveTimeout =>
103
+ }: Receive ) andThen { _ =>
104
+ doTimeouts()
105
+ sendReplies()
106
+ evictFinished()
112
107
}
113
108
114
109
private def doTimeouts (): Unit = {
115
110
val now = Deadline .now
116
111
val expired = replies.iterator.takeWhile(_._2.deadline <= now)
117
- expired.map(_._2).foreach(dispose)
112
+ for ((seq, state) <- expired) {
113
+ state match {
114
+ case Unknown (deadline, received, missing) =>
115
+ // did not reach consensus on this one, pick simple majority
116
+ val counts = received.groupBy(_.res)
117
+ val biggest = counts.iterator.map(_._2.size).max
118
+ val winners = counts.collectFirst {
119
+ case (res, win) if win.size == biggest => win
120
+ }.get
121
+ val losers = (received -- winners).map(_.replica)
122
+ // don’t wait for further responses
123
+ replies += seq -> Known (deadline, winners.head, losers, Set .empty)
124
+ case Known (deadline, reply, wrong, missing) =>
125
+ // don’t wait for further responses
126
+ replies += seq -> Known (deadline, reply, wrong, Set .empty)
127
+ }
128
+ }
118
129
}
119
130
120
- /**
121
- * The given reply state has been removed from the replies map and is now
122
- * being disposed of. This means that we need to act upon wrong replies
123
- * from replicas.
124
- *
125
- * If there are replicas for which no reply has been recorded yet, we
126
- * ignore them. If they reply incorrectly later they will be replaced then.
127
- * GC pauses are tolerated: do not kick out replicas just for being slow.
128
- */
129
- private def dispose (state : ReplyState ): Unit =
130
- state match {
131
- case Unknown (_, replies, _) =>
132
- // did not reach consensus on this one, pick simple majority
133
- val counts = replies.toList.groupBy(_.res)
134
- val biggest = counts.iterator.map(_._2.size).max
135
- val winners = counts.collectFirst {
136
- case (res, win) if win.size == biggest => win
137
- }.get
138
- val losers = replicas -- winners.iterator.map(_.replica).toSet
139
- losers foreach replaceReplica
140
- case Known (_, _, wrong, _) =>
141
- wrong foreach replaceReplica
131
+ @ tailrec private def sendReplies (): Unit =
132
+ replies.get(nextReply) match {
133
+ case Some (k @ Known (_, reply, _, _)) =>
134
+ reply.replyTo ! reply.res
135
+ nextReply += 1
136
+ sendReplies()
137
+ case _ =>
142
138
}
143
139
144
- private def replaceReplica (r : ActorRef ): Unit = {
145
- replicas -= r
146
- r ! PoisonPill
147
- val newReplica = context.actorOf(Replica .props)
148
- replicas.head ! SendInitialData (newReplica)
149
- replicas += newReplica
150
- }
140
+ @ tailrec private def evictFinished (): Unit =
141
+ replies.headOption match {
142
+ case Some ((seq, k @ Known (_, _, wrong, _))) if k.isFinished =>
143
+ wrong foreach (replaceReplica(_, terminate = true ))
144
+ replies -= seq
145
+ evictFinished()
146
+ case _ =>
147
+ }
148
+
149
+ private def replaceReplica (r : ActorRef , terminate : Boolean ): Unit =
150
+ if (replicas contains r) {
151
+ replicas -= r
152
+ if (terminate) r ! PoisonPill
153
+ val replica = newReplica()
154
+ replicas.head ! SendInitialData (replica)
155
+ replicas += replica
156
+ }
151
157
}
152
158
153
159
}
0 commit comments