Skip to content

Commit 7493804

Browse files
committed
refine ActiveActive and start chapter13
in particular add EC2Worker
1 parent c4ae769 commit 7493804

File tree

5 files changed

+89
-18
lines changed

5 files changed

+89
-18
lines changed

build.sbt

+2
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ lazy val chapter04 = project dependsOn (common)
1111
lazy val chapter07 = project dependsOn (common)
1212

1313
lazy val chapter12 = project dependsOn (common)
14+
15+
lazy val chapter13 = project dependsOn (common)

chapter12/src/main/scala/com/reactivedesignpatterns/chapter12/ActiveActive.scala

+24-18
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ object ActiveActive {
3232

3333
def initialized: Receive = {
3434
case SeqCommand(seq, cmd, replyTo) =>
35+
// tracking of sequence numbers and resends is elided here
3536
cmd match {
3637
case Put(key, value, r) =>
3738
map += key -> value
@@ -53,18 +54,19 @@ object ActiveActive {
5354
def add(res: SeqResult): ReplyState
5455
def isFinished: Boolean = missing.isEmpty
5556
}
56-
private case class Unknown(deadline: Deadline, replies: Set[SeqResult], missing: Set[ActorRef]) extends ReplyState {
57+
private case class Unknown(deadline: Deadline, replies: Set[SeqResult], missing: Set[ActorRef], quorum: Int) extends ReplyState {
5758
override def add(res: SeqResult): ReplyState = {
58-
val quorum = (missing.size + 1) / 2
5959
val nextReplies = replies + res
60+
val nextMissing = missing - res.replica
6061
if (nextReplies.size >= quorum) {
6162
val answer = replies.toSeq.groupBy(_.res).collectFirst { case (k, s) if s.size >= quorum => s.head }
6263
if (answer.isDefined) {
6364
val right = answer.get
6465
val wrong = replies.collect { case SeqResult(_, res, replica, _) if res != right => replica }
65-
Known(deadline, right, wrong, missing - res.replica)
66-
} else Unknown(deadline, nextReplies, missing - res.replica)
67-
} else Unknown(deadline, nextReplies, missing - res.replica)
66+
Known(deadline, right, wrong, nextMissing)
67+
} else if (nextMissing.isEmpty) Known.fromUnknown(deadline, nextReplies)
68+
else Unknown(deadline, nextReplies, nextMissing, quorum)
69+
} else Unknown(deadline, nextReplies, nextMissing, quorum)
6870
}
6971
}
7072
private case class Known(deadline: Deadline, reply: SeqResult, wrong: Set[ActorRef], missing: Set[ActorRef]) extends ReplyState {
@@ -73,6 +75,18 @@ object ActiveActive {
7375
Known(deadline, reply, nextWrong, missing - res.replica)
7476
}
7577
}
78+
private object Known {
79+
def fromUnknown(deadline: Deadline, replies: Set[SeqResult]): Known = {
80+
// did not reach consensus on this one, pick simple majority
81+
val counts = replies.groupBy(_.res)
82+
val biggest = counts.iterator.map(_._2.size).max
83+
val winners = counts.collectFirst {
84+
case (res, win) if win.size == biggest => win
85+
}.get
86+
val losers = (replies -- winners).map(_.replica)
87+
Known(deadline, winners.head, losers, Set.empty)
88+
}
89+
}
7690

7791
class Coordinator(N: Int) extends Actor {
7892
private var replicas = (1 to N).map(_ => newReplica()).toSet
@@ -81,7 +95,7 @@ object ActiveActive {
8195
private var nextReply = 0
8296

8397
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
84-
98+
8599
private def newReplica(): ActorRef =
86100
context.watch(context.actorOf(Replica.props))
87101

@@ -92,7 +106,7 @@ object ActiveActive {
92106
case cmd: Command =>
93107
val c = SeqCommand(seqNr.next, cmd, self)
94108
replicas foreach (_ ! c)
95-
replies += c.seq -> Unknown(5 seconds fromNow, Set.empty, replicas)
109+
replies += c.seq -> Unknown(5 seconds fromNow, Set.empty, replicas, (replicas.size + 1) / 2)
96110
case res: SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
97111
val prevState = replies(res.seq)
98112
val nextState = prevState.add(res)
@@ -111,18 +125,10 @@ object ActiveActive {
111125
val expired = replies.iterator.takeWhile(_._2.deadline <= now)
112126
for ((seq, state) <- expired) {
113127
state match {
114-
case Unknown(deadline, received, missing) =>
115-
// did not reach consensus on this one, pick simple majority
116-
val counts = received.groupBy(_.res)
117-
val biggest = counts.iterator.map(_._2.size).max
118-
val winners = counts.collectFirst {
119-
case (res, win) if win.size == biggest => win
120-
}.get
121-
val losers = (received -- winners).map(_.replica)
122-
// don’t wait for further responses
123-
replies += seq -> Known(deadline, winners.head, losers, Set.empty)
128+
case Unknown(deadline, received, _, _) =>
129+
val forced = Known.fromUnknown(deadline, received)
130+
replies += seq -> forced
124131
case Known(deadline, reply, wrong, missing) =>
125-
// don’t wait for further responses
126132
replies += seq -> Known(deadline, reply, wrong, Set.empty)
127133
}
128134
}

chapter13/build.sbt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import Build._
2+
3+
libraryDependencies ++= Seq(akkaActor, amazonAWS, scalaJava8, akkaTestkit, junit, scalatest)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter13;
5+
6+
import java.util.List;
7+
8+
import scala.PartialFunction;
9+
import scala.concurrent.ExecutionContext;
10+
import scala.concurrent.Future;
11+
import akka.dispatch.Futures;
12+
import akka.japi.pf.PFBuilder;
13+
import akka.pattern.CircuitBreaker;
14+
15+
import com.amazonaws.AmazonClientException;
16+
import com.amazonaws.auth.AWSCredentials;
17+
import com.amazonaws.services.ec2.AmazonEC2Client;
18+
import com.amazonaws.services.ec2.model.Instance;
19+
import com.amazonaws.services.ec2.model.Reservation;
20+
import com.amazonaws.services.ec2.model.RunInstancesRequest;
21+
import com.amazonaws.services.ec2.model.RunInstancesResult;
22+
23+
public class EC2Worker {
24+
public Instance startInstance(AWSCredentials credentials) {
25+
AmazonEC2Client amazonEC2Client = new AmazonEC2Client(credentials);
26+
27+
RunInstancesRequest runInstancesRequest = new RunInstancesRequest()
28+
.withImageId("").withInstanceType("m1.small").withMinCount(1)
29+
.withMaxCount(1);
30+
31+
RunInstancesResult runInstancesResult = amazonEC2Client
32+
.runInstances(runInstancesRequest);
33+
34+
Reservation reservation = runInstancesResult.getReservation();
35+
List<Instance> instances = reservation.getInstances();
36+
37+
// there will be exactly one instance in this list, otherwise
38+
// runInstances() would have thrown an exception
39+
return instances.get(0);
40+
}
41+
42+
private ExecutionContext executionContext;
43+
private CircuitBreaker circuitBreaker;
44+
45+
public Future<Instance> startInstanceAsync(AWSCredentials credentials) {
46+
Future<Instance> f = circuitBreaker.callWithCircuitBreaker(() ->
47+
Futures.future(() -> startInstance(credentials), executionContext));
48+
PartialFunction<Throwable, Future<Instance>> recovery =
49+
new PFBuilder<Throwable, Future<Instance>>()
50+
.match(AmazonClientException.class,
51+
ex -> ex.isRetryable(),
52+
ex -> startInstanceAsync(credentials))
53+
.build();
54+
return f.recoverWith(recovery, executionContext);
55+
}
56+
57+
}

project/Build.scala

+3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ object Build {
88
val akkaContrib = "com.typesafe.akka" %% "akka-contrib" % akkaVersion
99
val akkaDData = "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
1010

11+
val amazonAWS = "com.amazonaws" % "aws-java-sdk" % "1.10.14"
12+
1113
val sbtIO = "org.scala-sbt" %% "io" % "0.13.8"
1214

1315
val scalaAsync = "org.scala-lang.modules" %% "scala-async" % "0.9.2"
16+
val scalaJava8 = "org.scala-lang.modules" %% "scala-java8-compat" % "0.6.0" exclude("org.scala-lang.modules", "fngen_2.11")
1417

1518
val playJson = "com.typesafe.play" %% "play-json" % "2.4.0"
1619

0 commit comments

Comments
 (0)