Skip to content

Commit d976c5f

Browse files
committed
2 parents dfd9b35 + 32c36d3 commit d976c5f

File tree

4 files changed

+208
-28
lines changed

4 files changed

+208
-28
lines changed

chapter04/src/main/scala/com/reactivedesignpatterns/chapter4/TranslationService.scala

+75-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,85 @@ package com.reactivedesignpatterns.chapter4
22

33
import scala.concurrent.ExecutionContext
44
import scala.concurrent.Future
5+
import scala.concurrent.duration._
6+
import akka.actor.ActorRef
7+
import akka.actor.Actor
8+
import akka.util.Timeout
9+
import com.reactivedesignpatterns.Defaults._
10+
import java.util.concurrent.TimeoutException
11+
import akka.actor.Props
512

613
class TranslationService {
714
import ExecutionContext.Implicits.global
8-
15+
916
def translate(input: String): Future[String] = Future { Thread.sleep(100); "How are you?" }
10-
17+
1118
def translate(input: String, ec: ExecutionContext): Future[String] =
1219
Future { Thread.sleep(100); "How are you?" }(ec)
1320
}
21+
22+
/**
23+
* Another implementation that is based on Actors; this is used in the example
24+
* which shows how to assert the absence of indirectly invoked messages by way
25+
* of a protocol adapter test for this translation service.
26+
*/
27+
object TranslationService {
28+
29+
/**
30+
* Simplistic version 1 of the protocol: the reply will just be a String.
31+
*/
32+
case class TranslateV1(query: String, replyTo: ActorRef)
33+
34+
/**
35+
* Implementation of the TranslateV1 protocol.
36+
*/
37+
private class TranslatorV1 extends Actor {
38+
def receive = {
39+
case TranslateV1(query, replyTo) =>
40+
if (query == "sv:en:Hur mår du?") {
41+
replyTo ! "How are you?"
42+
} else {
43+
replyTo ! s"error:cannot translate '$query'"
44+
}
45+
}
46+
}
47+
48+
def propsV1: Props = Props(new TranslatorV1)
49+
50+
/**
51+
* More advanced version 2 of the protocol with proper reply types.
52+
* Languages are communicated as Strings for brevity, in a real project
53+
* these would be modeled as a proper Language type (statically known
54+
* enumeration or based on runtime registration of values).
55+
*/
56+
case class TranslateV2(phrase: String, inputLanguage: String, outputLanguage: String, replyTo: ActorRef)
57+
58+
sealed trait TranslationResponseV2
59+
case class TranslationV2(inputPhrase: String, outputPhrase: String, inputLanguage: String, outputLanguage: String)
60+
case class TranslationErrorV2(inputPhrase: String, inputLanguage: String, outputLanguage: String, errorMessage: String)
61+
62+
/**
63+
* Implementation of the TranslateV2 protocol based on TranslatorV1.
64+
*/
65+
private class TranslatorV2(v1: ActorRef) extends Actor {
66+
implicit val timeout = Timeout(5.seconds)
67+
import context.dispatcher
68+
69+
def receive = {
70+
case TranslateV2(phrase, in, out, replyTo) =>
71+
v1 ? (TranslateV1(s"$in:$out:$phrase", _)) collect {
72+
case str: String =>
73+
if (str.startsWith("error:")) {
74+
TranslationErrorV2(phrase, in, out, str.substring(6))
75+
} else {
76+
TranslationV2(phrase, str, in, out)
77+
}
78+
} recover {
79+
case _: TimeoutException =>
80+
TranslationErrorV2(phrase, in, out, "timeout while talking to V1 back-end")
81+
} pipeTo replyTo
82+
}
83+
}
84+
85+
def propsV2(v1: ActorRef): Props = Props(new TranslatorV2(v1))
86+
}

chapter04/src/test/scala/com/reactivedesignpatterns/chapter4/EchoServiceSpec.scala

+16-20
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package com.reactivedesignpatterns.chapter4
33
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
44
import scala.util.Try
55
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
6-
import com.reactivedesignpatterns.Scoped.scoped
76
import com.reactivedesignpatterns.Defaults._
87
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, actorRef2Scala }
98
import akka.testkit.TestProbe
@@ -39,16 +38,13 @@ object EchoServiceSpec {
3938
timings: List[FiniteDuration]): Receive = {
4039
case TimedResponse(Response(r), d) =>
4140
val start = outstanding(r)
42-
scoped(
43-
outstanding - r + sendRequest(echo, receiver),
44-
(d - start) :: timings,
45-
remaining - 1
46-
) { (outstanding, timings, remaining) =>
47-
if (remaining > 0)
48-
context.become(running(reportTo, echo, remaining, receiver, outstanding, timings))
49-
else
50-
context.become(finishing(reportTo, outstanding, timings))
51-
}
41+
val newOutstanding = outstanding - r + sendRequest(echo, receiver)
42+
val newTimings = (d - start) :: timings
43+
val newRemaining = remaining - 1
44+
if (newRemaining > 0)
45+
context.become(running(reportTo, echo, newRemaining, receiver, newOutstanding, newTimings))
46+
else
47+
context.become(finishing(reportTo, newOutstanding, newTimings))
5248
case AbortSLATest =>
5349
context.stop(self)
5450
reportTo ! SLAResponse(timings, outstanding)
@@ -57,12 +53,12 @@ object EchoServiceSpec {
5753
def finishing(reportTo: ActorRef, outstanding: Map[String, Timestamp], timings: List[FiniteDuration]): Receive = {
5854
case TimedResponse(Response(r), d) =>
5955
val start = outstanding(r)
60-
scoped(outstanding - r, (d - start) :: timings) { (outstanding, timings) =>
61-
if (outstanding.isEmpty) {
62-
context.stop(self)
63-
reportTo ! SLAResponse(timings, outstanding)
64-
} else context.become(finishing(reportTo, outstanding, timings))
65-
}
56+
val newOutstanding = outstanding - r
57+
val newTimings = (d - start) :: timings
58+
if (newOutstanding.isEmpty) {
59+
context.stop(self)
60+
reportTo ! SLAResponse(newTimings, newOutstanding)
61+
} else context.become(finishing(reportTo, newOutstanding, newTimings))
6662
case AbortSLATest =>
6763
context.stop(self)
6864
reportTo ! SLAResponse(timings, outstanding)
@@ -191,9 +187,9 @@ akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 3
191187
}
192188

193189
}
194-
190+
195191
"An EchoService (with LatencyTestSupport)" should {
196-
192+
197193
"keep its SLA" in {
198194
implicit val timeout = Timeout(5.seconds)
199195
import system.dispatcher
@@ -208,7 +204,7 @@ akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 3
208204
lat.failureCount should be(0)
209205
lat.quantile(0.99) should be < 10.milliseconds
210206
}
211-
207+
212208
}
213209

214210
}

chapter04/src/test/scala/com/reactivedesignpatterns/chapter4/TranslationServiceSpec.scala

+108-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,51 @@ import scala.concurrent.ExecutionContext
88
import scala.concurrent.Await
99
import scala.concurrent.duration._
1010
import org.scalatest.concurrent.Eventually
11+
import org.scalatest.BeforeAndAfterAll
12+
import akka.actor.ActorSystem
13+
import akka.testkit.TestProbe
14+
import akka.actor.Actor
15+
import akka.actor.ActorRef
16+
import akka.actor.Props
1117

12-
class TranslationServiceSpec extends WordSpec with Matchers with Eventually {
18+
object TranslationServiceSpec {
19+
import TranslationService._
20+
21+
case object ExpectNominal
22+
case object ExpectError
23+
case class Unexpected(msg: Any)
24+
25+
class MockV1(reporter: ActorRef) extends Actor {
26+
def receive = initial
27+
28+
override def unhandled(msg: Any) = {
29+
reporter ! Unexpected(msg)
30+
}
31+
32+
val initial: Receive = {
33+
case ExpectNominal => context.become(expectingNominal)
34+
case ExpectError => context.become(expectingError)
35+
}
36+
37+
val expectingNominal: Receive = {
38+
case TranslateV1("sv:en:Hur mår du?", replyTo) =>
39+
replyTo ! "How are you?"
40+
context.become(initial)
41+
}
42+
43+
val expectingError: Receive = {
44+
case TranslateV1(other, replyTo) =>
45+
replyTo ! s"error:cannot parse input '$other'"
46+
context.become(initial)
47+
}
48+
}
49+
50+
def mockV1props(reporter: ActorRef): Props = Props(new MockV1(reporter))
51+
52+
}
53+
54+
class TranslationServiceSpec extends WordSpec with Matchers with Eventually with BeforeAndAfterAll {
55+
import TranslationServiceSpec._
1356

1457
"A TranslationService" should {
1558

@@ -22,15 +65,15 @@ class TranslationServiceSpec extends WordSpec with Matchers with Eventually {
2265
val ec = SynchronousEventLoop
2366
tr.translate(input, ec).value.get should be(Success(output))
2467
}
25-
68+
2669
"using Await.result" in {
2770
val tr = new TranslationService
2871
val input = "Hur mår du?"
2972
val output = "How are you?"
3073
val future = tr.translate(input)
3174
Await.result(future, 1.second) should be(output)
3275
}
33-
76+
3477
"using eventually()" in {
3578
val tr = new TranslationService
3679
val input = "Hur mår du?"
@@ -55,4 +98,66 @@ class TranslationServiceSpec extends WordSpec with Matchers with Eventually {
5598

5699
}
57100

101+
implicit val system = ActorSystem("TranslationServiceSpec")
102+
override def afterAll(): Unit = system.shutdown()
103+
104+
"A TranslationService version adapter" should {
105+
import TranslationService._
106+
107+
"translate requests" in {
108+
val v1 = TestProbe()
109+
val v2 = system.actorOf(propsV2(v1.ref))
110+
val client = TestProbe()
111+
112+
// initiate a request to the adapter
113+
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)
114+
115+
// verify that the adapter asks the V1 service back-end
116+
val req1 = v1.expectMsgType[TranslateV1]
117+
req1.query should be("sv:en:Hur mår du?")
118+
119+
// initiate a reply
120+
req1.replyTo ! "How are you?"
121+
122+
// verify that the adapter transforms it correctly
123+
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))
124+
125+
// now verify translation errors
126+
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
127+
val req2 = v1.expectMsgType[TranslateV1]
128+
// this implicitly verifies that no other communication happened with V1
129+
req2.query should be("sv:en:Hur är läget?")
130+
req2.replyTo ! "error:cannot parse input 'sv:en:Hur är läget?'"
131+
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))
132+
133+
v1.expectNoMsg(1.second)
134+
}
135+
136+
"translate requests with async error reporting" in {
137+
val asyncErrors = TestProbe()
138+
val v1 = system.actorOf(mockV1props(asyncErrors.ref))
139+
val v2 = system.actorOf(propsV2(v1))
140+
val client = TestProbe()
141+
142+
// initiate a request to the adapter
143+
v1 ! ExpectNominal
144+
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)
145+
146+
// verify that the adapter transforms it correctly
147+
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))
148+
149+
// non-blocking check for async errors
150+
asyncErrors.expectNoMsg(0.seconds)
151+
152+
// now verify translation errors
153+
v1 ! ExpectError
154+
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
155+
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))
156+
157+
// final check for async errors
158+
asyncErrors.expectNoMsg(1.second)
159+
}
160+
161+
}
162+
58163
}

common/src/main/scala/com/reactivedesignpatterns/Defaults.scala

+9-3
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@ import scala.concurrent.duration._
44
import akka.actor.ActorRef
55
import scala.concurrent.Future
66
import akka.util.Timeout
7+
import scala.util.Success
78

89
object Defaults {
9-
10+
1011
val Timestamp = Deadline
1112
type Timestamp = Deadline
12-
13+
1314
val SynchronousEventLoop = scala.concurrent.rdpextras.SynchronousEventLoop
1415

1516
implicit class AskableActorRef(val ref: ActorRef) extends AnyVal {
1617
def ?(f: ActorRef => Any)(implicit timeout: Timeout): Future[Any] = akka.rdpextras.AskPattern.ask(ref, timeout, f)
1718
}
18-
19+
20+
implicit class PipeTo[T](val f: Future[T]) extends AnyVal {
21+
def pipeTo(ref: ActorRef)(implicit self: ActorRef = ActorRef.noSender): Future[T] =
22+
f.andThen { case Success(msg) => ref ! msg }(SynchronousEventLoop)
23+
}
24+
1925
}

0 commit comments

Comments
 (0)