Skip to content

Commit e416973

Browse files
committed
update Scala/Akka and make compile, #3
1 parent b59708e commit e416973

File tree

8 files changed

+33
-52
lines changed

8 files changed

+33
-52
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
organization in ThisBuild := "com.reactivedesignpatterns"
22
version in ThisBuild := "0.1-SNAPSHOT"
3-
scalaVersion in ThisBuild := "2.11.6"
3+
scalaVersion in ThisBuild := "2.11.8"
44

55
lazy val common = project
66

chapter13/src/main/scala/com/reactivedesignpatterns/chapter13/RateLimiter.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,3 @@ class RateLimiter(requests: Int, period: FiniteDuration) {
2727
}
2828
}
2929
}
30-
31-
object CircuitBreaker {
32-
private object StorageFailed extends RuntimeException
33-
private def sendToStorage(job: Job): Future[StorageStatus] = {
34-
// make an asynchronous request to the storage subsystem
35-
val f: Future[StorageStatus] = ???
36-
// map storage failures to Future failures to alert the breaker
37-
f.map {
38-
case StorageStatus.Failed => throw StorageFailed
39-
case other => other
40-
}
41-
}
42-
private val breaker = CircuitBreaker(
43-
system.scheduler, // used for scheduling timeouts
44-
5, // number of failures in a row when it trips
45-
300.millis, // timeout for each service call
46-
30.seconds // time before trying to close after tripping
47-
)
48-
def persist(job: Job): Future[StorageStatus] =
49-
breaker
50-
.withCircuitBreaker(sendToStorage(job))
51-
.recover {
52-
case StorageFailed => StorageStatus.Failed
53-
case _: TimeoutException => StorageStatus.Unknown
54-
case _: CircuitBreakerOpenException => StorageStatus.Failed
55-
}
56-
}

chapter14/src/main/java/com/reactivedesignpatterns/chapter14/ComplexCommand.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,18 @@
66
import java.util.Collection;
77
import java.util.Collections;
88
import java.util.Set;
9+
import java.util.concurrent.CompletionStage;
910
import java.util.stream.Stream;
1011

1112
import javax.script.Invocable;
1213
import javax.script.ScriptEngine;
1314
import javax.script.ScriptEngineManager;
1415

16+
import akka.NotUsed;
1517
import akka.japi.Pair;
1618
import akka.japi.function.*;
1719
import akka.stream.javadsl.*;
18-
import akka.stream.scaladsl.Keep;
20+
import akka.stream.javadsl.Keep;
1921
import scala.concurrent.Future;
2022
import scala.runtime.BoxedUnit;
2123

@@ -151,10 +153,10 @@ public T apply(T arg0, DataElement arg1) throws Exception {
151153

152154
class Inject<T> implements Function<DataElement, DataElement> {
153155
private static final long serialVersionUID = 1L;
154-
public final RunnableGraph<Future<T>> value;
156+
public final RunnableGraph<CompletionStage<T>> value;
155157
public final String fieldname;
156158

157-
public Inject(RunnableGraph<Future<T>> value, String fieldname) {
159+
public Inject(RunnableGraph<CompletionStage<T>> value, String fieldname) {
158160
this.value = value;
159161
this.fieldname = fieldname;
160162
}
@@ -198,10 +200,10 @@ public Set<T> apply(Set<T> arg0, DataElement arg1) throws Exception {
198200
}
199201

200202
public static void akkaStreamDSL() {
201-
RunnableGraph<Future<Long>> p =
203+
RunnableGraph<CompletionStage<Long>> p =
202204
Source.<DataElement>empty()
203205
.filter(new InRange("year", 1950, 1960))
204-
.toMat(Sink.fold(0L, new Median<Long>("price")), Keep.<BoxedUnit, Long>right());
206+
.toMat(Sink.fold(0L, new Median<Long>("price")), Keep.<NotUsed, CompletionStage<Long>>right());
205207

206208
Source.<DataElement>empty()
207209
.map(new Inject<Long>(p, "p"))

chapter15/src/main/scala/com/reactivedesignpatterns/chapter15/Aggregator.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import akka.typed.AskPattern._
99
import scala.concurrent.duration._
1010
import akka.util.Timeout
1111
import akka.pattern.AskTimeoutException
12+
import akka.actor.ReceiveTimeout
1213

1314
object Aggregator {
1415

@@ -59,7 +60,7 @@ object Aggregator {
5960
ContextAware { ctx =>
6061
Static {
6162
case GetFrontPage(user, replyTo) =>
62-
val childRef = ctx.spawnAnonymous(Props {
63+
val childRef = ctx.spawnAnonymous(Deferred { () =>
6364
val builder = new FrontPageResultBuilder(user)
6465
Partial[AnyRef](
6566
pf {
@@ -85,6 +86,7 @@ object Aggregator {
8586
ContextAware { ctx =>
8687
import ctx.executionContext
8788
implicit val timeout = Timeout(1.second)
89+
implicit val scheduler = ctx.system.scheduler
8890

8991
Static {
9092
case GetFrontPage(user, replyTo) =>
@@ -123,6 +125,7 @@ object Aggregator {
123125
ContextAware { ctx =>
124126
import ctx.executionContext
125127
implicit val timeout = Timeout(1.second)
128+
implicit val scheduler = ctx.system.scheduler
126129

127130
Static {
128131
case GetFrontPage(user, replyTo) =>

chapter15/src/main/scala/com/reactivedesignpatterns/chapter15/AskPattern.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ object AskPattern {
2727
body: String, correlationID: UUID,
2828
replyTo: ActorRef[SendEmailResult])
2929

30+
sealed trait Result
31+
case object ReceiveTimeout extends Result
3032
case class SendEmailResult(correlationID: UUID, status: StatusCode,
31-
explanation: String)
33+
explanation: String) extends Result
3234

3335
sealed trait StatusCode
3436
object StatusCode {
@@ -38,7 +40,7 @@ object AskPattern {
3840

3941
def withoutAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
4042
ContextAware[MyCommands] { ctx =>
41-
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
43+
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
4244
var statusMap = Map.empty[UUID, (String, ActorRef[VerificationProcessResponse])]
4345
val adapter = ctx.spawnAdapter((s: SendEmailResult) => MyEmailResult(s.correlationID, s.status, s.explanation))
4446

@@ -69,16 +71,16 @@ object AskPattern {
6971

7072
def withChildActor(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
7173
ContextAware { ctx: ActorContext[StartVerificationProcess] =>
72-
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
74+
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
7375

7476
Static {
7577
case StartVerificationProcess(userEmail, replyTo) =>
7678
val corrID = UUID.randomUUID()
77-
val childActor = ctx.spawnAnonymous(Props(FullTotal[SendEmailResult] {
79+
val childActor = ctx.spawnAnonymous(FullTotal[Result] {
7880
case Sig(ctx, PreStart) =>
79-
ctx.setReceiveTimeout(5.seconds)
81+
ctx.setReceiveTimeout(5.seconds, ReceiveTimeout)
8082
Same
81-
case Sig(_, ReceiveTimeout) =>
83+
case Msg(_, ReceiveTimeout) =>
8284
log.warning("verification process initiation timed out for {}", userEmail)
8385
replyTo ! VerificationProcessFailed(userEmail)
8486
Stopped
@@ -93,17 +95,18 @@ object AskPattern {
9395
case Msg(_, SendEmailResult(wrongID, _, _)) =>
9496
log.error("received wrong SendEmailResult for corrID {}", corrID)
9597
Same
96-
}))
98+
})
9799
val request = SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, childActor)
98100
emailGateway ! request
99101
}
100102
}
101103

102104
def withAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
103105
ContextAware { ctx =>
104-
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
106+
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
105107
implicit val timeout = Timeout(5.seconds)
106108
import ctx.executionContext
109+
implicit val scheduler = ctx.system.scheduler
107110

108111
Static {
109112
case StartVerificationProcess(userEmail, replyTo) =>

chapter15/src/main/scala/com/reactivedesignpatterns/chapter15/BusinessHandshake.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ object PersistentBusinessHandshake extends App {
142142
var deliveryId: Long = 0
143143
deliver(alice)(id => { deliveryId = id; ChangeBudget(-amount, self, persistenceId) })
144144

145-
LoggingReceive {
145+
LoggingReceive({
146146
case ChangeBudgetDone =>
147147
persist(AliceConfirmedChange(deliveryId)) { ev =>
148148
confirmDelivery(ev.deliveryId)
@@ -153,7 +153,7 @@ object PersistentBusinessHandshake extends App {
153153
confirmDelivery(ev.deliveryId)
154154
context.stop(self)
155155
}
156-
}
156+
}: Receive)
157157
}
158158

159159
def talkToBob() = {

chapter15/src/main/scala/com/reactivedesignpatterns/chapter15/RequestResponseTypedActors.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ object RequestResponseTypedActors {
3030
}
3131

3232
def main(args: Array[String]): Unit = {
33-
ActorSystem("ReqResTyped", Props(ContextAware[Unit] { ctx =>
34-
val res = ctx.spawn(Props(responder), "responder")
35-
val req = ctx.watch(ctx.spawn(Props(requester(res)), "requester"))
33+
ActorSystem("ReqResTyped", ContextAware[Unit] { ctx =>
34+
val res = ctx.spawn(responder, "responder")
35+
val req = ctx.watch(ctx.spawn(requester(res), "requester"))
3636
Full {
3737
case Sig(ctx, Terminated(`req`)) => Stopped
3838
}
39-
}))
39+
})
4040
}
41-
}
41+
}

project/Build.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
import sbt._
22

33
object Build {
4-
val akkaVersion = "2.4.1"
4+
val akkaVersion = "2.4.17"
55

66
val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
77
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
88
val akkaContrib = "com.typesafe.akka" %% "akka-contrib" % akkaVersion
99
val akkaSharding= "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
1010
val akkaDData = "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
11-
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1"
11+
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
1212
val akkaTyped = "com.typesafe.akka" %% "akka-typed-experimental" % akkaVersion
1313
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % akkaVersion
1414
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query-experimental" % akkaVersion
1515

1616
val levelDb = "org.iq80.leveldb" % "leveldb" % "0.7"
1717

18-
val amazonAWS = "com.amazonaws" % "aws-java-sdk" % "1.10.14"
18+
val amazonAWS = "com.amazonaws" % "aws-java-sdk" % "1.11.109"
1919

2020
val sbtIO = "org.scala-sbt" %% "io" % "0.13.8"
2121

0 commit comments

Comments
 (0)