Skip to content

Commit 28f675d

Browse files
committed
finish chapter 15
1 parent 681de03 commit 28f675d

File tree

6 files changed

+576
-16
lines changed

6 files changed

+576
-16
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter15
5+
6+
import java.math.MathContext
7+
import java.math.RoundingMode
8+
import java.util.concurrent.ThreadLocalRandom
9+
import java.util.concurrent.TimeoutException
10+
import scala.collection.immutable.Queue
11+
import scala.concurrent.Future
12+
import scala.concurrent.duration._
13+
import com.typesafe.config.ConfigFactory
14+
import akka.actor._
15+
import akka.pattern.extended.ask
16+
import akka.util.Timeout
17+
import akka.stream.scaladsl._
18+
import akka.stream.ActorMaterializer
19+
20+
object DropPattern {
21+
22+
case class Job(id: Long, input: Int, replyTo: ActorRef)
23+
case class JobRejected(id: Long)
24+
case class JobResult(id: Long, report: BigDecimal)
25+
26+
case class WorkRequest(worker: ActorRef, items: Int)
27+
case class DummyWork(count: Int)
28+
29+
class Manager extends Actor {
30+
31+
var workQueue = Queue.empty[Job]
32+
var requestQueue = Queue.empty[WorkRequest]
33+
34+
val queueThreshold = 1000
35+
val dropThreshold = 1500
36+
def random = ThreadLocalRandom.current
37+
def shallEnqueue(atSize: Int) =
38+
(atSize < queueThreshold) || {
39+
val dropFactor = (atSize - queueThreshold) >> 6
40+
random.nextInt(dropFactor + 2) == 0
41+
}
42+
43+
(1 to 8) foreach (_ => context.actorOf(Props(new Worker(self))))
44+
45+
def receive = {
46+
case job @ Job(id, _, replyTo) =>
47+
if (requestQueue.isEmpty) {
48+
val atSize = workQueue.size
49+
if (shallEnqueue(atSize)) workQueue :+= job
50+
else if (atSize < dropThreshold) replyTo ! JobRejected(id)
51+
} else {
52+
val WorkRequest(worker, items) = requestQueue.head
53+
worker ! job
54+
if (items > 1) worker ! DummyWork(items - 1)
55+
requestQueue = requestQueue.drop(1)
56+
}
57+
case wr @ WorkRequest(worker, items) =>
58+
if (workQueue.isEmpty) {
59+
requestQueue :+= wr
60+
} else {
61+
workQueue.iterator.take(items).foreach(job => worker ! job)
62+
if (workQueue.size < items) worker ! DummyWork(items - workQueue.size)
63+
workQueue = workQueue.drop(items)
64+
}
65+
}
66+
}
67+
68+
val mc = new MathContext(100, RoundingMode.HALF_EVEN)
69+
70+
class Worker(manager: ActorRef) extends Actor {
71+
val plus = BigDecimal(1, mc)
72+
val minus = BigDecimal(-1, mc)
73+
74+
var requested = 0
75+
def request(): Unit =
76+
if (requested < 5) {
77+
manager ! WorkRequest(self, 10)
78+
requested += 10
79+
}
80+
81+
request()
82+
83+
def receive = {
84+
case Job(id, data, replyTo) =>
85+
requested -= 1
86+
request()
87+
val sign = if ((data & 1) == 1) plus else minus
88+
val result = sign / data
89+
replyTo ! JobResult(id, result)
90+
case DummyWork(count) =>
91+
requested -= count
92+
request()
93+
}
94+
}
95+
96+
case class Report(success: Int, failure: Int, dropped: Int, value: BigDecimal) {
97+
def +(other: Report) =
98+
Report(success + other.success, failure + other.failure, dropped + other.dropped, value + other.value)
99+
}
100+
object Report {
101+
def success(v: BigDecimal) = Report(1, 0, 0, v)
102+
val failure = Report(0, 1, 0, BigDecimal(0, mc))
103+
val dropped = Report(0, 0, 1, BigDecimal(0, mc))
104+
val empty = Report(0, 0, 0, BigDecimal(0, mc))
105+
}
106+
107+
def main(args: Array[String]): Unit = {
108+
implicit val sys = ActorSystem("pi")
109+
import sys.dispatcher
110+
implicit val timeout = Timeout(1.seconds)
111+
implicit val materializer = ActorMaterializer()
112+
113+
val calculator = sys.actorOf(Props(new Manager), "manager")
114+
115+
Source(1 to 1000000)
116+
// experiment with the parallelism number to see dropping in effect
117+
.mapAsyncUnordered(1000) { i =>
118+
(calculator ? (Job(i, i, _)))
119+
.collect {
120+
case JobResult(_, report) => Report.success(report)
121+
case _ => Report.failure
122+
}
123+
.recover {
124+
case _: TimeoutException => Report.dropped
125+
}
126+
}
127+
.runFold(Report.empty)(_ + _)
128+
.map(x => println(s"final result: $x"))
129+
.recover {
130+
case ex =>
131+
ex.printStackTrace()
132+
}
133+
.foreach(_ => sys.terminate())
134+
}
135+
136+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter15
5+
6+
import java.math.MathContext
7+
import java.math.RoundingMode
8+
import java.util.concurrent.ThreadLocalRandom
9+
import java.util.concurrent.TimeoutException
10+
import scala.collection.immutable.Queue
11+
import scala.concurrent.Future
12+
import scala.concurrent.duration._
13+
import com.typesafe.config.ConfigFactory
14+
import akka.actor._
15+
import akka.pattern.extended.ask
16+
import akka.util.Timeout
17+
import akka.stream.scaladsl._
18+
import akka.stream.ActorMaterializer
19+
import scala.concurrent.Await
20+
21+
object DropPatternWithProtection {
22+
23+
case class Job(id: Long, input: Int, replyTo: ActorRef)
24+
case class JobRejected(id: Long)
25+
case class JobResult(id: Long, report: BigDecimal)
26+
27+
case class WorkRequest(worker: ActorRef, items: Int)
28+
case class DummyWork(count: Int)
29+
30+
private case class WorkEnvelope(job: Job) {
31+
@volatile var consumed = false
32+
}
33+
34+
case class GetIncomingRef(replyTo: ActorRef)
35+
36+
class Protector extends Actor {
37+
val manager = context.actorOf(Props(new Manager), "manager")
38+
val incomingQueue = context.actorOf(Props(new IncomingQueue(manager)).withMailbox("bounded-mailbox"), "incomingQueue")
39+
def receive = {
40+
case GetIncomingRef(replyTo) => replyTo ! incomingQueue
41+
}
42+
}
43+
44+
private class IncomingQueue(manager: ActorRef) extends Actor {
45+
var workQueue = Queue.empty[WorkEnvelope]
46+
def receive = {
47+
case job: Job =>
48+
workQueue = workQueue.dropWhile(_.consumed)
49+
if (workQueue.size < 1000) {
50+
val envelope = WorkEnvelope(job)
51+
workQueue :+= envelope
52+
manager ! envelope
53+
}
54+
}
55+
}
56+
57+
class Manager extends Actor {
58+
59+
var workQueue = Queue.empty[Job]
60+
var requestQueue = Queue.empty[WorkRequest]
61+
62+
val queueThreshold = 1000
63+
val dropThreshold = 1500
64+
def random = ThreadLocalRandom.current
65+
def shallEnqueue(atSize: Int) =
66+
(atSize < queueThreshold) || {
67+
val dropFactor = (atSize - queueThreshold) >> 6
68+
random.nextInt(dropFactor + 2) == 0
69+
}
70+
71+
(1 to 8) foreach (_ => context.actorOf(Props(new Worker(self))))
72+
73+
def receive = {
74+
case envelope @ WorkEnvelope(job @ Job(id, _, replyTo)) =>
75+
envelope.consumed = true
76+
if (requestQueue.isEmpty) {
77+
val atSize = workQueue.size
78+
if (shallEnqueue(atSize)) workQueue :+= job
79+
else if (atSize < dropThreshold) replyTo ! JobRejected(id)
80+
} else {
81+
val WorkRequest(worker, items) = requestQueue.head
82+
worker ! job
83+
if (items > 1) worker ! DummyWork(items - 1)
84+
requestQueue = requestQueue.drop(1)
85+
}
86+
case wr @ WorkRequest(worker, items) =>
87+
if (workQueue.isEmpty) {
88+
requestQueue :+= wr
89+
} else {
90+
workQueue.iterator.take(items).foreach(job => worker ! job)
91+
if (workQueue.size < items) worker ! DummyWork(items - workQueue.size)
92+
workQueue = workQueue.drop(items)
93+
}
94+
}
95+
}
96+
97+
val mc = new MathContext(100, RoundingMode.HALF_EVEN)
98+
99+
class Worker(manager: ActorRef) extends Actor {
100+
val plus = BigDecimal(1, mc)
101+
val minus = BigDecimal(-1, mc)
102+
103+
var requested = 0
104+
def request(): Unit =
105+
if (requested < 5) {
106+
manager ! WorkRequest(self, 10)
107+
requested += 10
108+
}
109+
110+
request()
111+
112+
def receive = {
113+
case Job(id, data, replyTo) =>
114+
requested -= 1
115+
request()
116+
val sign = if ((data & 1) == 1) plus else minus
117+
val result = sign / data
118+
replyTo ! JobResult(id, result)
119+
case DummyWork(count) =>
120+
requested -= count
121+
request()
122+
}
123+
}
124+
125+
case class Report(success: Int, failure: Int, dropped: Int, value: BigDecimal) {
126+
def +(other: Report) =
127+
Report(success + other.success, failure + other.failure, dropped + other.dropped, value + other.value)
128+
}
129+
object Report {
130+
def success(v: BigDecimal) = Report(1, 0, 0, v)
131+
val failure = Report(0, 1, 0, BigDecimal(0, mc))
132+
val dropped = Report(0, 0, 1, BigDecimal(0, mc))
133+
val empty = Report(0, 0, 0, BigDecimal(0, mc))
134+
}
135+
136+
def main(args: Array[String]): Unit = {
137+
val config = ConfigFactory.parseString("""
138+
bounded-mailbox {
139+
mailbox-type = "akka.dispatch.BoundedMailbox"
140+
mailbox-capacity = 1000
141+
mailbox-push-timeout-time = 0s
142+
}
143+
""")
144+
implicit val sys = ActorSystem("pi", config)
145+
import sys.dispatcher
146+
implicit val timeout = Timeout(1.seconds)
147+
implicit val materializer = ActorMaterializer()
148+
149+
val protector = sys.actorOf(Props(new Protector), "protector")
150+
val calculator = Await.result((protector ? GetIncomingRef).mapTo[ActorRef], 1.second)
151+
152+
Source(1 to 10000000)
153+
// experiment with the parallelism number to see dropping in effect
154+
.mapAsyncUnordered(100000) { i =>
155+
(calculator ? (Job(i, i, _)))
156+
.collect {
157+
case JobResult(_, report) => Report.success(report)
158+
case _ => Report.failure
159+
}
160+
.recover {
161+
case _: TimeoutException => Report.dropped
162+
}
163+
}
164+
.runFold(Report.empty)(_ + _)
165+
.map(x => println(s"final result: $x"))
166+
.recover {
167+
case ex =>
168+
ex.printStackTrace()
169+
}
170+
.foreach(_ => sys.terminate())
171+
}
172+
173+
}

chapter15/src/main/scala/com/reactivedesignpatterns/chapter15/PullPattern.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import akka.actor.Props
1414
object PullPattern {
1515

1616
case class Job(id: Long, input: Int, replyTo: ActorRef)
17-
case class Result(id: Long, report: BigDecimal)
17+
case class JobResult(id: Long, report: BigDecimal)
1818

1919
case class WorkRequest(worker: ActorRef, items: Int)
2020

@@ -37,7 +37,7 @@ object PullPattern {
3737
worker ! job
3838
outstandingWork += 1
3939
}
40-
case Result(id, report) =>
40+
case JobResult(id, report) =>
4141
approximation = aggregator(approximation, report)
4242
outstandingWork -= 1
4343
if (outstandingWork == 0 && workStream.isEmpty) {
@@ -67,7 +67,7 @@ object PullPattern {
6767
request()
6868
val sign = if ((data & 1) == 1) plus else minus
6969
val result = sign / data
70-
replyTo ! Result(id, result)
70+
replyTo ! JobResult(id, result)
7171
}
7272
}
7373

0 commit comments

Comments
 (0)