Skip to content

Commit e7fef59

Browse files
committed
add saga etc.
1 parent 0f38a0c commit e7fef59

File tree

8 files changed

+645
-4
lines changed

8 files changed

+645
-4
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,4 @@ project/boot/
113113
.cache-main
114114

115115
theDatabase*
116+
journal/

chapter12/src/main/scala/com/reactivedesignpatterns/chapter12/RateLimiter.scala

+28-1
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,31 @@ class RateLimiter(requests: Int, period: FiniteDuration) {
2626
block
2727
}
2828
}
29-
}
29+
}
30+
31+
object CircuitBreaker {
32+
private object StorageFailed extends RuntimeException
33+
private def sendToStorage(job: Job): Future[StorageStatus] = {
34+
// make an asynchronous request to the storage subsystem
35+
val f: Future[StorageStatus] = ???
36+
// map storage failures to Future failures to alert the breaker
37+
f.map {
38+
case StorageStatus.Failed => throw StorageFailed
39+
case other => other
40+
}
41+
}
42+
private val breaker = CircuitBreaker(
43+
system.scheduler, // used for scheduling timeouts
44+
5, // number of failures in a row when it trips
45+
300.millis, // timeout for each service call
46+
30.seconds // time before trying to close after tripping
47+
)
48+
def persist(job: Job): Future[StorageStatus] =
49+
breaker
50+
.withCircuitBreaker(sendToStorage(job))
51+
.recover {
52+
case StorageFailed => StorageStatus.Failed
53+
case _: TimeoutException => StorageStatus.Unknown
54+
case _: CircuitBreakerOpenException => StorageStatus.Failed
55+
}
56+
}

chapter14/build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
import Build._
22

3-
libraryDependencies ++= Seq(akkaActor, akkaTyped, junit)
3+
libraryDependencies ++= Seq(akkaActor, akkaTyped, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter14
5+
6+
import akka.typed._
7+
import akka.typed.ScalaDSL._
8+
import akka.typed.AskPattern._
9+
import scala.concurrent.duration._
10+
import akka.util.Timeout
11+
import akka.pattern.AskTimeoutException
12+
13+
object Aggregator {
14+
15+
case class GetTheme(user: String, replyTo: ActorRef[ThemeResult])
16+
case class ThemeResult(css: String)
17+
18+
case class GetPersonalNews(user: String, replyTo: ActorRef[PersonalNewsResult])
19+
case class PersonalNewsResult(news: List[String])
20+
21+
case class GetTopNews(replyTo: ActorRef[TopNewsResult])
22+
case class TopNewsResult(news: List[String])
23+
24+
case class GetFrontPage(user: String, replyTo: ActorRef[FrontPageResult])
25+
case class FrontPageResult(user: String, css: String, news: List[String])
26+
27+
case class GetOverride(replyTo: ActorRef[OverrideResult])
28+
sealed trait OverrideResult
29+
case object NoOverride extends OverrideResult
30+
case class Override(css: String, news: List[String]) extends OverrideResult
31+
32+
class FrontPageResultBuilder(user: String) {
33+
private var css: Option[String] = None
34+
private var personalNews: Option[List[String]] = None
35+
private var topNews: Option[List[String]] = None
36+
37+
def addCSS(css: String): Unit = this.css = Option(css)
38+
def addPersonalNews(news: List[String]): Unit = this.personalNews = Option(news)
39+
def addTopNews(news: List[String]): Unit = this.topNews = Option(news)
40+
41+
def timeout(): Unit = {
42+
if (css.isEmpty) css = Some("default.css")
43+
if (personalNews.isEmpty) personalNews = Some(Nil)
44+
if (topNews.isEmpty) topNews = Some(Nil)
45+
}
46+
47+
def isComplete: Boolean = css.isDefined && personalNews.isDefined && topNews.isDefined
48+
49+
def result: FrontPageResult = {
50+
val topSet = topNews.get.toSet
51+
val allNews = topNews.get ::: personalNews.get.filterNot(topSet.contains)
52+
FrontPageResult(user, css.get, allNews)
53+
}
54+
}
55+
56+
private def pf(p: PartialFunction[AnyRef, Unit]): p.type = p
57+
58+
def frontPage(themes: ActorRef[GetTheme], personalNews: ActorRef[GetPersonalNews], topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
59+
ContextAware { ctx =>
60+
Static {
61+
case GetFrontPage(user, replyTo) =>
62+
val childRef = ctx.spawnAnonymous(Props {
63+
val builder = new FrontPageResultBuilder(user)
64+
Partial[AnyRef](
65+
pf {
66+
case ThemeResult(css) => builder.addCSS(css)
67+
case PersonalNewsResult(news) => builder.addPersonalNews(news)
68+
case TopNewsResult(news) => builder.addTopNews(news)
69+
case ReceiveTimeout => builder.timeout()
70+
} andThen { _ =>
71+
if (builder.isComplete) {
72+
replyTo ! builder.result
73+
Stopped
74+
} else Same
75+
})
76+
})
77+
themes ! GetTheme(user, childRef)
78+
personalNews ! GetPersonalNews(user, childRef)
79+
topNews ! GetTopNews(childRef)
80+
ctx.schedule(1.second, childRef, ReceiveTimeout)
81+
}
82+
}
83+
84+
def futureFrontPage(themes: ActorRef[GetTheme], personalNews: ActorRef[GetPersonalNews], topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
85+
ContextAware { ctx =>
86+
import ctx.executionContext
87+
implicit val timeout = Timeout(1.second)
88+
89+
Static {
90+
case GetFrontPage(user, replyTo) =>
91+
val cssFuture =
92+
(themes ? (GetTheme(user, _: ActorRef[ThemeResult])))
93+
.map(_.css)
94+
.recover {
95+
case _: AskTimeoutException => "default.css"
96+
}
97+
val personalNewsFuture =
98+
(personalNews ? (GetPersonalNews(user, _: ActorRef[PersonalNewsResult])))
99+
.map(_.news)
100+
.recover {
101+
case _: AskTimeoutException => Nil
102+
}
103+
val topNewsFuture =
104+
(topNews ? (GetTopNews(_: ActorRef[TopNewsResult])))
105+
.map(_.news)
106+
.recover {
107+
case _: AskTimeoutException => Nil
108+
}
109+
for {
110+
css <- cssFuture
111+
personalNews <- personalNewsFuture
112+
topNews <- topNewsFuture
113+
} {
114+
val topSet = topNews.toSet
115+
val allNews = topNews ::: personalNews.filterNot(topSet.contains)
116+
replyTo ! FrontPageResult(user, css, allNews)
117+
}
118+
}
119+
}
120+
121+
def futureFrontPageWithOverride(themes: ActorRef[GetTheme], personalNews: ActorRef[GetPersonalNews],
122+
topNews: ActorRef[GetTopNews], overrides: ActorRef[GetOverride]): Behavior[GetFrontPage] =
123+
ContextAware { ctx =>
124+
import ctx.executionContext
125+
implicit val timeout = Timeout(1.second)
126+
127+
Static {
128+
case GetFrontPage(user, replyTo) =>
129+
val cssFuture =
130+
(themes ? (GetTheme(user, _: ActorRef[ThemeResult])))
131+
.map(_.css)
132+
.recover {
133+
case _: AskTimeoutException => "default.css"
134+
}
135+
val personalNewsFuture =
136+
(personalNews ? (GetPersonalNews(user, _: ActorRef[PersonalNewsResult])))
137+
.map(_.news)
138+
.recover {
139+
case _: AskTimeoutException => Nil
140+
}
141+
val topNewsFuture =
142+
(topNews ? (GetTopNews(_: ActorRef[TopNewsResult])))
143+
.map(_.news)
144+
.recover {
145+
case _: AskTimeoutException => Nil
146+
}
147+
val overrideFuture =
148+
(overrides ? (GetOverride(_: ActorRef[OverrideResult])))
149+
.recover {
150+
case _: AskTimeoutException => NoOverride
151+
}
152+
for {
153+
css <- cssFuture
154+
personalNews <- personalNewsFuture
155+
topNews <- topNewsFuture
156+
ovr <- overrideFuture
157+
} ovr match {
158+
case NoOverride =>
159+
val topSet = topNews.toSet
160+
val allNews = topNews ::: personalNews.filterNot(topSet.contains)
161+
replyTo ! FrontPageResult(user, css, allNews)
162+
case other => // nothing to do here
163+
}
164+
for {
165+
ovr <- overrideFuture
166+
} ovr match {
167+
case NoOverride => // nothing to do here
168+
case Override(css, news) =>
169+
replyTo ! FrontPageResult(user, css, news)
170+
}
171+
}
172+
}
173+
174+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter14
5+
6+
import akka.typed._
7+
import akka.typed.ScalaDSL._
8+
import akka.typed.AskPattern._
9+
import java.util.UUID
10+
import akka.event.Logging
11+
import scala.concurrent.Future
12+
import akka.util.Timeout
13+
import scala.concurrent.duration._
14+
import akka.pattern.AskTimeoutException
15+
16+
object AskPattern {
17+
18+
sealed trait MyCommands
19+
case class StartVerificationProcess(userEmail: String, replyTo: ActorRef[VerificationProcessResponse]) extends MyCommands
20+
private case class MyEmailResult(correlationID: UUID, status: StatusCode, explanation: String) extends MyCommands
21+
22+
sealed trait VerificationProcessResponse
23+
case class VerificationProcessStarted(userEmail: String) extends VerificationProcessResponse
24+
case class VerificationProcessFailed(userEmail: String) extends VerificationProcessResponse
25+
26+
case class SendEmail(sender: String, recipients: List[String],
27+
body: String, correlationID: UUID,
28+
replyTo: ActorRef[SendEmailResult])
29+
30+
case class SendEmailResult(correlationID: UUID, status: StatusCode,
31+
explanation: String)
32+
33+
sealed trait StatusCode
34+
object StatusCode {
35+
case object OK extends StatusCode
36+
case object Failed extends StatusCode
37+
}
38+
39+
def withoutAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
40+
ContextAware[MyCommands] { ctx =>
41+
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
42+
var statusMap = Map.empty[UUID, (String, ActorRef[VerificationProcessResponse])]
43+
val adapter = ctx.spawnAdapter((s: SendEmailResult) => MyEmailResult(s.correlationID, s.status, s.explanation))
44+
45+
Static {
46+
case StartVerificationProcess(userEmail, replyTo) =>
47+
val corrID = UUID.randomUUID()
48+
val request = SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, adapter)
49+
emailGateway ! request
50+
statusMap += corrID -> (userEmail, replyTo)
51+
ctx.schedule(5.seconds, ctx.self, MyEmailResult(corrID, StatusCode.Failed, "timeout"))
52+
case MyEmailResult(corrID, status, expl) =>
53+
statusMap.get(corrID) match {
54+
case None =>
55+
log.error("received SendEmailResult for unknown correlation ID {}", corrID)
56+
case Some((userEmail, replyTo)) =>
57+
status match {
58+
case StatusCode.OK =>
59+
log.debug("successfully started the verification process for {}", userEmail)
60+
replyTo ! VerificationProcessStarted(userEmail)
61+
case StatusCode.Failed =>
62+
log.info("failed to start the verification process for {}: {}", userEmail, expl)
63+
replyTo ! VerificationProcessFailed(userEmail)
64+
}
65+
statusMap -= corrID
66+
}
67+
}
68+
}.narrow[StartVerificationProcess]
69+
70+
def withChildActor(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
71+
ContextAware { ctx: ActorContext[StartVerificationProcess] =>
72+
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
73+
74+
Static {
75+
case StartVerificationProcess(userEmail, replyTo) =>
76+
val corrID = UUID.randomUUID()
77+
val childActor = ctx.spawnAnonymous(Props(FullTotal[SendEmailResult] {
78+
case Sig(ctx, PreStart) =>
79+
ctx.setReceiveTimeout(5.seconds)
80+
Same
81+
case Sig(_, ReceiveTimeout) =>
82+
log.warning("verification process initiation timed out for {}", userEmail)
83+
replyTo ! VerificationProcessFailed(userEmail)
84+
Stopped
85+
case Msg(_, SendEmailResult(`corrID`, StatusCode.OK, _)) =>
86+
log.debug("successfully started the verification process for {}", userEmail)
87+
replyTo ! VerificationProcessStarted(userEmail)
88+
Stopped
89+
case Msg(_, SendEmailResult(`corrID`, StatusCode.Failed, explanation)) =>
90+
log.info("failed to start the verification process for {}: {}", userEmail, explanation)
91+
replyTo ! VerificationProcessFailed(userEmail)
92+
Stopped
93+
case Msg(_, SendEmailResult(wrongID, _, _)) =>
94+
log.error("received wrong SendEmailResult for corrID {}", corrID)
95+
Same
96+
}))
97+
val request = SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, childActor)
98+
emailGateway ! request
99+
}
100+
}
101+
102+
def withAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
103+
ContextAware { ctx =>
104+
val log = Logging(ctx.system.eventStream, "VerificationProcessManager")
105+
implicit val timeout = Timeout(5.seconds)
106+
import ctx.executionContext
107+
108+
Static {
109+
case StartVerificationProcess(userEmail, replyTo) =>
110+
val corrID = UUID.randomUUID()
111+
val response: Future[SendEmailResult] =
112+
emailGateway ? (SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, _))
113+
response.map {
114+
case SendEmailResult(`corrID`, StatusCode.OK, _) =>
115+
log.debug("successfully started the verification process for {}", userEmail)
116+
VerificationProcessStarted(userEmail)
117+
case SendEmailResult(`corrID`, StatusCode.Failed, explanation) =>
118+
log.info("failed to start the verification process for {}: {}", userEmail, explanation)
119+
VerificationProcessFailed(userEmail)
120+
case SendEmailResult(wrongID, _, _) =>
121+
log.error("received wrong SendEmailResult for corrID {}", corrID)
122+
VerificationProcessFailed(userEmail)
123+
}.recover {
124+
case _: AskTimeoutException =>
125+
log.warning("verification process initiation timed out for {}", userEmail)
126+
VerificationProcessFailed(userEmail)
127+
}.foreach(result => replyTo ! result)
128+
}
129+
}
130+
131+
private def constructBody(userEmail: String, corrID: UUID): String = ???
132+
133+
}

0 commit comments

Comments
 (0)