Skip to content

Commit 1fef4f1

Browse files
committed
finish chapter 16
1 parent 214764c commit 1fef4f1

File tree

1 file changed

+134
-0
lines changed

1 file changed

+134
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
import akka.actor._
7+
import akka.stream._
8+
import akka.stream.scaladsl._
9+
import akka.persistence.journal._
10+
import akka.persistence.query._
11+
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
12+
import com.typesafe.config.ConfigFactory
13+
import java.net.URI
14+
import java.util.concurrent.ThreadLocalRandom
15+
import scala.concurrent.duration._
16+
17+
class ShoppingCartTagging(system: ExtendedActorSystem) extends WriteEventAdapter {
18+
def manifest(event: Any): String = ""
19+
20+
def toJournal(event: Any): Any =
21+
event match {
22+
case s: ShoppingCartMessage => Tagged(event, Set("shoppingCart"))
23+
case other => other
24+
}
25+
}
26+
27+
class ShoppingCartSimulator extends Actor with ActorLogging {
28+
def rnd = ThreadLocalRandom.current
29+
30+
val items = Array(
31+
"apple",
32+
"banana",
33+
"plum",
34+
"pear",
35+
"peach").map(f => ItemRef(new URI(f)))
36+
def pickItem() = items(rnd.nextInt(items.length))
37+
38+
val customers = Array(
39+
"alice",
40+
"bob",
41+
"charlie",
42+
"mallory").map(c => CustomerRef(new URI(c)))
43+
def pickCustomer() = customers(rnd.nextInt(customers.length))
44+
45+
val id = Iterator from 0
46+
def command(cmd: Command) = ManagerCommand(cmd, id.next, self)
47+
48+
def driveCart(num: Int) = {
49+
val cartRef = ShoppingCartRef(new URI(f"cart$num%08X"))
50+
val manager = context.actorOf(Props(new PersistentObjectManager), cartRef.id.toString)
51+
manager ! command(SetOwner(cartRef, pickCustomer()))
52+
while (rnd.nextDouble() < 0.95) {
53+
val cmd =
54+
if (rnd.nextBoolean()) AddItem(cartRef, pickItem(), rnd.nextInt(14) + 1)
55+
else RemoveItem(cartRef, pickItem(), rnd.nextInt(10) + 1)
56+
manager ! command(cmd)
57+
}
58+
manager ! ManagerQuery(GetItems(cartRef), num, self)
59+
}
60+
61+
case class Cont(id: Int)
62+
self ! Cont(0)
63+
64+
def receive = {
65+
case Cont(n) => driveCart(n)
66+
case ManagerEvent(id, _) => if (id % 10000 == 0) log.info("done {} commands", id)
67+
case ManagerResult(num, GetItemsResult(cart, items)) =>
68+
context.stop(context.child(cart.id.toString).get)
69+
self ! Cont(num.toInt + 1)
70+
}
71+
}
72+
73+
case class GetTopProducts(id: Long, replyTo: ActorRef)
74+
case class TopProducts(id: Long, products: Map[ItemRef, Int])
75+
76+
object TopProductListener {
77+
private class IntHolder(var value: Int)
78+
}
79+
80+
class TopProductListener extends Actor with ActorLogging {
81+
import TopProductListener._
82+
implicit val materializer = ActorMaterializer()
83+
84+
val readJournal =
85+
PersistenceQuery(context.system)
86+
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
87+
88+
readJournal.eventsByTag("shoppingCart", 0)
89+
.collect { case EventEnvelope(_, _, _, add: ItemAdded) => add }
90+
.groupedWithin(100000, 1.second)
91+
.addAttributes(Attributes.asyncBoundary)
92+
.runForeach { seq =>
93+
val histogram = seq.foldLeft(Map.empty[ItemRef, IntHolder]) { (map, event) =>
94+
map.get(event.item) match {
95+
case Some(holder) => { holder.value += event.count; map }
96+
case None => map.updated(event.item, new IntHolder(event.count))
97+
}
98+
}
99+
self ! TopProducts(0, histogram.map(p => (p._1, p._2.value)))
100+
}
101+
102+
var topProducts = Map.empty[ItemRef, Int]
103+
104+
def receive = {
105+
case GetTopProducts(id, replyTo) => replyTo ! TopProducts(id, topProducts)
106+
case TopProducts(_, products) =>
107+
topProducts = products
108+
log.info("new {}", products)
109+
}
110+
}
111+
112+
object EventStreamExample extends App {
113+
val config = ConfigFactory.parseString("""
114+
akka.loglevel = INFO
115+
akka.actor.debug.unhandled = on
116+
akka.actor.warn-about-java-serializer-usage = off
117+
akka.persistence.snapshot-store.plugin = "akka.persistence.no-snapshot-store"
118+
akka.persistence.journal {
119+
plugin = "akka.persistence.journal.leveldb"
120+
leveldb {
121+
native = off
122+
event-adapters {
123+
tagging = "com.reactivedesignpatterns.chapter16.ShoppingCartTagging"
124+
}
125+
event-adapter-bindings {
126+
"com.reactivedesignpatterns.chapter16.ShoppingCartMessage" = tagging
127+
}
128+
}
129+
}
130+
""")
131+
val sys = ActorSystem("EventStream", config)
132+
sys.actorOf(Props(new ShoppingCartSimulator), "simulator")
133+
sys.actorOf(Props(new TopProductListener), "listener")
134+
}

0 commit comments

Comments
 (0)