1
+ /**
2
+ * Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3
+ */
4
+ package com .reactivedesignpatterns .chapter12
5
+
6
+ import akka .actor ._
7
+ import play .api .libs .json .JsValue
8
+ import scala .collection .immutable .TreeMap
9
+ import scala .concurrent .duration ._
10
+
11
+ object ActiveActive {
12
+ import ReplicationProtocol ._
13
+ import Persistence ._
14
+
15
+ private case class SeqCommand (seq : Int , cmd : Command , replyTo : ActorRef )
16
+ private case class SeqResult (seq : Int , res : Result , replica : ActorRef , replyTo : ActorRef )
17
+
18
+ private case class SendInitialData (toReplica : ActorRef )
19
+ private case class InitialData (map : Map [String , JsValue ])
20
+
21
+ class Replica extends Actor with Stash {
22
+ var map = Map .empty[String , JsValue ]
23
+
24
+ def receive = {
25
+ case InitialData (m) =>
26
+ map = m
27
+ context.become(initialized)
28
+ unstashAll()
29
+ case _ => stash()
30
+ }
31
+
32
+ def initialized : Receive = {
33
+ case SeqCommand (seq, cmd, replyTo) =>
34
+ cmd match {
35
+ case Put (key, value, r) =>
36
+ map += key -> value
37
+ replyTo ! SeqResult (seq, PutConfirmed (key), self, r)
38
+ case Get (key, r) =>
39
+ replyTo ! SeqResult (seq, GetResult (key, map get key), self, r)
40
+ }
41
+ case SendInitialData (toReplica) => toReplica ! InitialData (map)
42
+ }
43
+ }
44
+
45
+ object Replica {
46
+ val props = Props (new Replica )
47
+ }
48
+
49
+ private sealed trait ReplyState {
50
+ def deadline : Deadline
51
+ def missing : Set [ActorRef ]
52
+ def add (res : SeqResult ): ReplyState
53
+ def isKnown : Boolean
54
+ def isFinished : Boolean = missing.isEmpty
55
+ }
56
+ private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ]) extends ReplyState {
57
+ override def add (res : SeqResult ): ReplyState = {
58
+ val quorum = (missing.size + 1 ) / 2
59
+ val nextReplies = replies + res
60
+ if (nextReplies.size >= quorum) {
61
+ val answer = replies.toSeq.groupBy(_.res).collectFirst { case (k, s) if s.size >= quorum => s.head }
62
+ if (answer.isDefined) {
63
+ val right = answer.get
64
+ val wrong = replies.collect { case SeqResult (_, res, replica, _) if res != right => replica }
65
+ Known (deadline, right, wrong, missing - res.replica)
66
+ } else Unknown (deadline, nextReplies, missing - res.replica)
67
+ } else Unknown (deadline, nextReplies, missing - res.replica)
68
+ }
69
+ override def isKnown = false
70
+ }
71
+ private case class Known (deadline : Deadline , reply : SeqResult , wrong : Set [ActorRef ], missing : Set [ActorRef ]) extends ReplyState {
72
+ override def add (res : SeqResult ): ReplyState = {
73
+ val nextWrong = if (res.res == reply.res) wrong else wrong + res.replica
74
+ Known (deadline, reply, nextWrong, missing - res.replica)
75
+ }
76
+ override def isKnown = true
77
+ }
78
+
79
+ class Coordinator (N : Int ) extends Actor {
80
+ private var replicas = (1 to N ).map(_ => context.actorOf(Replica .props)).toSet
81
+ private val seqNr = Iterator from 0
82
+ private var replies = TreeMap .empty[Int , ReplyState ]
83
+ private var nextReply = 0
84
+
85
+ context.setReceiveTimeout(1 .second)
86
+
87
+ def receive = {
88
+ case cmd : Command =>
89
+ val c = SeqCommand (seqNr.next, cmd, self)
90
+ replicas foreach (_ ! c)
91
+ replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas)
92
+ doTimeouts()
93
+ case res : SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
94
+ val prevState = replies(res.seq)
95
+ val nextState = prevState.add(res)
96
+ // potentially send reply if quorum of replies has been received now
97
+ nextState match {
98
+ case Known (seq, reply, _, _) if ! prevState.isKnown && seq == nextReply =>
99
+ reply.replyTo ! reply.res
100
+ nextReply += 1
101
+ case _ =>
102
+ }
103
+ // clean up state
104
+ if (nextState.isFinished) {
105
+ dispose(nextState)
106
+ replies -= res.seq
107
+ } else {
108
+ replies += res.seq -> nextState
109
+ doTimeouts()
110
+ }
111
+ case ReceiveTimeout => doTimeouts()
112
+ }
113
+
114
+ private def doTimeouts (): Unit = {
115
+ val now = Deadline .now
116
+ val expired = replies.iterator.takeWhile(_._2.deadline <= now)
117
+ expired.map(_._2).foreach(dispose)
118
+ }
119
+
120
+ /**
121
+ * The given reply state has been removed from the replies map and is now
122
+ * being disposed of. This means that we need to act upon wrong replies
123
+ * from replicas.
124
+ *
125
+ * If there are replicas for which no reply has been recorded yet, we
126
+ * ignore them. If they reply incorrectly later they will be replaced then.
127
+ * GC pauses are tolerated: do not kick out replicas just for being slow.
128
+ */
129
+ private def dispose (state : ReplyState ): Unit =
130
+ state match {
131
+ case Unknown (_, replies, _) =>
132
+ // did not reach consensus on this one, pick simple majority
133
+ val counts = replies.toList.groupBy(_.res)
134
+ val biggest = counts.iterator.map(_._2.size).max
135
+ val winners = counts.collectFirst {
136
+ case (res, win) if win.size == biggest => win
137
+ }.get
138
+ val losers = replicas -- winners.iterator.map(_.replica).toSet
139
+ losers foreach replaceReplica
140
+ case Known (_, _, wrong, _) =>
141
+ wrong foreach replaceReplica
142
+ }
143
+
144
+ private def replaceReplica (r : ActorRef ): Unit = {
145
+ replicas -= r
146
+ r ! PoisonPill
147
+ val newReplica = context.actorOf(Replica .props)
148
+ replicas.head ! SendInitialData (newReplica)
149
+ replicas += newReplica
150
+ }
151
+ }
152
+
153
+ }
0 commit comments