Skip to content

Commit 214764c

Browse files
committed
start chapter 16
1 parent 28f675d commit 214764c

File tree

8 files changed

+366
-1
lines changed

8 files changed

+366
-1
lines changed

chapter16/build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
import Build._
22

3-
libraryDependencies ++= Seq(akkaActor, akkaTyped, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)
3+
libraryDependencies ++= Seq(akkaActor, akkaSharding, akkaDData, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
trait ShoppingCartMessage {
7+
def shoppingCart: ShoppingCartRef
8+
}
9+
10+
sealed trait Command extends ShoppingCartMessage
11+
case class SetOwner(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Command
12+
case class AddItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command
13+
case class RemoveItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command
14+
15+
sealed trait Query extends ShoppingCartMessage
16+
case class GetItems(shoppingCart: ShoppingCartRef) extends Query
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
import java.net.URI
7+
8+
case class ItemRef(id: URI)
9+
case class CustomerRef(id: URI)
10+
case class ShoppingCartRef(id: URI)
11+
12+
case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
13+
def setOwner(customer: CustomerRef): ShoppingCart = {
14+
require(owner.isEmpty, "owner cannot be overwritten")
15+
copy(owner = Some(customer))
16+
}
17+
18+
def addItem(item: ItemRef, count: Int): ShoppingCart = {
19+
require(count > 0, s"count must be positive (trying to add $item with count $count)")
20+
val currentCount = items.get(item).getOrElse(0)
21+
copy(items = items.updated(item, currentCount + count))
22+
}
23+
24+
def removeItem(item: ItemRef, count: Int): ShoppingCart = {
25+
require(count > 0, s"count must be positive (trying to remove $item with count $count)")
26+
val currentCount = items.get(item).getOrElse(0)
27+
val newCount = currentCount - count
28+
if (newCount <= 0) copy(items = items - item)
29+
else copy(items = items.updated(item, newCount))
30+
}
31+
32+
// This is here for section 16.3
33+
def applyEvent(event: Event): ShoppingCart = event match {
34+
case OwnerChanged(_, owner) => setOwner(owner)
35+
case ItemAdded(_, item, count) => addItem(item, count)
36+
case ItemRemoved(_, item, count) => removeItem(item, count)
37+
}
38+
}
39+
40+
object ShoppingCart {
41+
val empty = ShoppingCart(Map.empty, None)
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
import akka.persistence.PersistentActor
7+
import akka.actor._
8+
import java.net.URI
9+
import java.util.UUID
10+
import com.typesafe.config.ConfigFactory
11+
12+
class PersistentObjectManager extends PersistentActor {
13+
// we expect the name to be the shopping card ID
14+
override def persistenceId = context.self.path.name
15+
16+
var shoppingCart = ShoppingCart.empty
17+
18+
def receiveCommand = {
19+
case ManagerCommand(cmd, id, replyTo) =>
20+
try {
21+
val event = cmd match {
22+
case SetOwner(cart, owner) => OwnerChanged(cart, owner)
23+
case AddItem(cart, item, count) => ItemAdded(cart, item, count)
24+
case RemoveItem(cart, item, count) => ItemRemoved(cart, item, count)
25+
}
26+
// perform the update here in order to treat validation errors immediately
27+
shoppingCart = shoppingCart.applyEvent(event)
28+
persist(event) { _ =>
29+
replyTo ! ManagerEvent(id, event)
30+
}
31+
} catch {
32+
case ex: IllegalArgumentException =>
33+
replyTo ! ManagerRejection(id, ex.getMessage)
34+
}
35+
case ManagerQuery(cmd, id, replyTo) =>
36+
try {
37+
val result = cmd match {
38+
case GetItems(cart) => GetItemsResult(cart, shoppingCart.items)
39+
}
40+
replyTo ! ManagerResult(id, result)
41+
} catch {
42+
case ex: IllegalArgumentException =>
43+
replyTo ! ManagerRejection(id, ex.getMessage)
44+
}
45+
}
46+
47+
def receiveRecover = {
48+
case e: Event => shoppingCart = shoppingCart.applyEvent(e)
49+
}
50+
}
51+
52+
/*
53+
* Running the application for the first time will do the same as the ManagerExample.
54+
* Running it again will read the persisted state from the local working directory
55+
* and consequently reject the SetOwner command, plus it will list four entries in the
56+
* GetItemsResult.
57+
*/
58+
object PersistentObjectManagerExample extends App {
59+
def mkURI(): URI = URI.create(UUID.randomUUID().toString)
60+
61+
val config = ConfigFactory.parseString("""
62+
akka.actor.warn-about-java-serializer-usage = off
63+
akka.persistence.journal {
64+
plugin = "akka.persistence.journal.leveldb"
65+
leveldb.native=off
66+
}
67+
akka.persistence.snapshot-store.plugin = "akka.persistence.no-snapshot-store"
68+
""")
69+
val sys = ActorSystem("ObjectManager", config)
70+
71+
val customer = CustomerRef(mkURI())
72+
val item1, item2 = ItemRef(mkURI())
73+
val shoppingCart = ShoppingCartRef(new URI("myCart"))
74+
75+
val manager = sys.actorOf(Props(new PersistentObjectManager), shoppingCart.id.toString)
76+
77+
sys.actorOf(Props(new Actor with ActorLogging {
78+
manager ! ManagerCommand(SetOwner(shoppingCart, customer), 0, self)
79+
manager ! ManagerCommand(AddItem(shoppingCart, item1, 5), 1, self)
80+
manager ! ManagerCommand(AddItem(shoppingCart, item1, -3), 2, self)
81+
manager ! ManagerCommand(AddItem(shoppingCart, item2, 6), 3, self)
82+
manager ! ManagerCommand(RemoveItem(shoppingCart, item1, 3), 4, self)
83+
manager ! ManagerQuery(GetItems(shoppingCart), 5, self)
84+
85+
def receive = {
86+
case ManagerEvent(id, event) => log.info("success ({}): {}", id, event)
87+
case ManagerRejection(id, msg) => log.warning("rejected ({}): {}", id, msg)
88+
case ManagerResult(id, result) =>
89+
log.info("result ({}): {}", id, result)
90+
context.system.terminate()
91+
}
92+
}), "client")
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
sealed trait Event extends ShoppingCartMessage
7+
case class OwnerChanged(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Event
8+
case class ItemAdded(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event
9+
case class ItemRemoved(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event
10+
11+
sealed trait Result extends ShoppingCartMessage
12+
case class GetItemsResult(shoppingCart: ShoppingCartRef, items: Map[ItemRef, Int]) extends Result
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
import java.net.URI
7+
import java.util.UUID
8+
import akka.actor._
9+
import scala.util.control.NonFatal
10+
11+
case class ManagerCommand(cmd: Command, id: Long, replyTo: ActorRef)
12+
case class ManagerEvent(id: Long, event: Event)
13+
case class ManagerQuery(cmd: Query, id: Long, replyTo: ActorRef)
14+
case class ManagerResult(id: Long, result: Result)
15+
case class ManagerRejection(id: Long, reason: String)
16+
17+
class Manager(var shoppingCart: ShoppingCart) extends Actor {
18+
/*
19+
* this is the usual constructor, the above allows priming with
20+
* previously persisted state.
21+
*/
22+
def this() = this(ShoppingCart.empty)
23+
24+
def receive = {
25+
case ManagerCommand(cmd, id, replyTo) =>
26+
try {
27+
val event = cmd match {
28+
case SetOwner(cart, owner) =>
29+
shoppingCart = shoppingCart.setOwner(owner)
30+
OwnerChanged(cart, owner)
31+
case AddItem(cart, item, count) =>
32+
shoppingCart = shoppingCart.addItem(item, count)
33+
ItemAdded(cart, item, count)
34+
case RemoveItem(cart, item, count) =>
35+
shoppingCart = shoppingCart.removeItem(item, count)
36+
ItemRemoved(cart, item, count)
37+
}
38+
replyTo ! ManagerEvent(id, event)
39+
} catch {
40+
case ex: IllegalArgumentException =>
41+
replyTo ! ManagerRejection(id, ex.getMessage)
42+
}
43+
case ManagerQuery(cmd, id, replyTo) =>
44+
try {
45+
val result = cmd match {
46+
case GetItems(cart) =>
47+
GetItemsResult(cart, shoppingCart.items)
48+
}
49+
replyTo ! ManagerResult(id, result)
50+
} catch {
51+
case ex: IllegalArgumentException =>
52+
replyTo ! ManagerRejection(id, ex.getMessage)
53+
}
54+
}
55+
}
56+
57+
object ManagerExample extends App {
58+
def mkURI(): URI = URI.create(UUID.randomUUID().toString)
59+
60+
val sys = ActorSystem("ObjectManager")
61+
62+
val customer = CustomerRef(mkURI())
63+
val item1, item2 = ItemRef(mkURI())
64+
val shoppingCart = ShoppingCartRef(mkURI())
65+
66+
val manager = sys.actorOf(Props(new Manager), "manager")
67+
68+
sys.actorOf(Props(new Actor with ActorLogging {
69+
manager ! ManagerCommand(SetOwner(shoppingCart, customer), 0, self)
70+
manager ! ManagerCommand(AddItem(shoppingCart, item1, 5), 1, self)
71+
manager ! ManagerCommand(AddItem(shoppingCart, item1, -3), 2, self)
72+
manager ! ManagerCommand(AddItem(shoppingCart, item2, 6), 3, self)
73+
manager ! ManagerCommand(RemoveItem(shoppingCart, item1, 3), 4, self)
74+
manager ! ManagerQuery(GetItems(shoppingCart), 5, self)
75+
76+
def receive = {
77+
case ManagerEvent(id, event) => log.info("success ({}): {}", id, event)
78+
case ManagerRejection(id, msg) => log.warning("rejected ({}): {}", id, msg)
79+
case ManagerResult(id, result) =>
80+
log.info("result ({}): {}", id, result)
81+
context.system.terminate()
82+
}
83+
}), "client")
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter16
5+
6+
import com.typesafe.config.ConfigFactory
7+
import akka.actor._
8+
import akka.cluster._
9+
import akka.cluster.sharding._
10+
import java.net.URI
11+
import java.util.UUID
12+
13+
object ShardSupport {
14+
/*
15+
* use the shoppingCart reference as the sharding key; the partial function
16+
* must return both the key and the message to be forwarded, and if it does
17+
* not match then the message is dropped
18+
*/
19+
val extractEntityId: ShardRegion.ExtractEntityId = {
20+
case mc @ ManagerCommand(cmd, _, _) => cmd.shoppingCart.id.toString -> mc
21+
case mc @ ManagerQuery(query, _, _) => query.shoppingCart.id.toString -> mc
22+
}
23+
24+
/*
25+
* allocate shoppingCarts into 256 shards based on the low 8 bits of their
26+
* ID’s hash; this is a total function that must be defined for all messages
27+
* that are forwarded
28+
*/
29+
val extractShardId: ShardRegion.ExtractShardId = {
30+
case ManagerCommand(cmd, _, _) => toHex(cmd.shoppingCart.id.hashCode & 255)
31+
case ManagerQuery(query, _, _) => toHex(query.shoppingCart.id.hashCode & 255)
32+
}
33+
private def toHex(b: Int) = new java.lang.StringBuilder(2).append(hexDigits(b >> 4)).append(hexDigits(b & 15)).toString
34+
private val hexDigits = "0123456789ABCDEF"
35+
36+
val RegionName = "ShoppingCart"
37+
}
38+
39+
object ShardingExample extends App {
40+
val clusterConfig = ConfigFactory.parseString("""
41+
akka.loglevel = INFO
42+
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
43+
akka.actor.warn-about-java-serializer-usage = off
44+
akka.cluster.min-nr-of-members = 2
45+
akka.remote.netty.tcp {
46+
hostname = localhost
47+
port = 0
48+
}
49+
akka.cluster.sharding.state-store-mode = ddata
50+
""")
51+
val node1Config = ConfigFactory.parseString("akka.remote.netty.tcp.port = 2552")
52+
53+
val sys1 = ActorSystem("ShardingExample", node1Config.withFallback(clusterConfig))
54+
val seed = Cluster(sys1).selfAddress
55+
56+
def startNode(sys: ActorSystem): Unit = {
57+
Cluster(sys).join(seed)
58+
ClusterSharding(sys).start(
59+
typeName = ShardSupport.RegionName,
60+
entityProps = Props(new Manager),
61+
settings = ClusterShardingSettings(sys1),
62+
extractEntityId = ShardSupport.extractEntityId,
63+
extractShardId = ShardSupport.extractShardId)
64+
}
65+
66+
startNode(sys1)
67+
68+
val sys2 = ActorSystem("ShardingExample", clusterConfig)
69+
startNode(sys2)
70+
71+
/*
72+
* From this point onward we can talk to the sharded shopping carts via
73+
* the shard region which acts as a local mediator that will send the
74+
* commands to the right node.
75+
*/
76+
val manager = ClusterSharding(sys1).shardRegion(ShardSupport.RegionName)
77+
78+
def mkURI(): URI = URI.create(UUID.randomUUID().toString)
79+
80+
val customer = CustomerRef(mkURI())
81+
val item1, item2 = ItemRef(mkURI())
82+
val shoppingCart1, shoppingCart2 = ShoppingCartRef(mkURI())
83+
84+
Cluster(sys1).registerOnMemberUp(
85+
sys1.actorOf(Props(new Actor with ActorLogging {
86+
manager ! ManagerCommand(SetOwner(shoppingCart1, customer), 0, self)
87+
manager ! ManagerCommand(AddItem(shoppingCart1, item1, 5), 1, self)
88+
manager ! ManagerCommand(AddItem(shoppingCart1, item1, -3), 2, self)
89+
manager ! ManagerCommand(AddItem(shoppingCart1, item2, 6), 3, self)
90+
manager ! ManagerCommand(RemoveItem(shoppingCart1, item1, 3), 4, self)
91+
manager ! ManagerQuery(GetItems(shoppingCart1), 5, self)
92+
93+
def receive = {
94+
case ManagerEvent(id, event) => log.info("success ({}): {}", id, event)
95+
case ManagerRejection(id, msg) => log.warning("rejected ({}): {}", id, msg)
96+
case ManagerResult(id, result) =>
97+
log.info("result ({}): {}", id, result)
98+
99+
manager ! ManagerCommand(SetOwner(shoppingCart2, customer), 10, self)
100+
manager ! ManagerCommand(AddItem(shoppingCart2, item2, 15), 11, self)
101+
manager ! ManagerCommand(AddItem(shoppingCart2, item2, -3), 12, self)
102+
manager ! ManagerCommand(AddItem(shoppingCart2, item1, 60), 13, self)
103+
manager ! ManagerCommand(RemoveItem(shoppingCart2, item2, 3), 14, self)
104+
manager ! ManagerQuery(GetItems(shoppingCart2), 15, self)
105+
106+
context.become(second)
107+
}
108+
def second: Receive = {
109+
case ManagerEvent(id, event) => log.info("success ({}): {}", id, event)
110+
case ManagerRejection(id, msg) => log.warning("rejected ({}): {}", id, msg)
111+
case ManagerResult(id, result) =>
112+
log.info("result ({}): {}", id, result)
113+
sys1.terminate()
114+
sys2.terminate()
115+
}
116+
}), "client"))
117+
}

project/Build.scala

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ object Build {
66
val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
77
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
88
val akkaContrib = "com.typesafe.akka" %% "akka-contrib" % akkaVersion
9+
val akkaSharding= "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
910
val akkaDData = "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
1011
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1"
1112
val akkaTyped = "com.typesafe.akka" %% "akka-typed-experimental" % akkaVersion

0 commit comments

Comments
 (0)