Skip to content

Commit 9f215f2

Browse files
committed
massive work on chapter13 samples
1 parent 7493804 commit 9f215f2

File tree

9 files changed

+491
-59
lines changed

9 files changed

+491
-59
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,5 +110,6 @@ src_managed/
110110
project/boot/
111111
.history
112112
.cache
113+
.cache-main
113114

114115
theDatabase*

chapter13/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
import Build._
22

3-
libraryDependencies ++= Seq(akkaActor, amazonAWS, scalaJava8, akkaTestkit, junit, scalatest)
3+
libraryDependencies ++= Seq(akkaActor, akkaStream, amazonAWS, scalaJava8, akkaTestkit, junit, scalatest)
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter13;
5+
6+
import java.util.Collection;
7+
import java.util.Collections;
8+
import java.util.Set;
9+
import java.util.stream.Stream;
10+
11+
import javax.script.Invocable;
12+
import javax.script.ScriptEngine;
13+
import javax.script.ScriptEngineManager;
14+
15+
import akka.japi.Pair;
16+
import akka.japi.function.*;
17+
import akka.stream.javadsl.*;
18+
import akka.stream.scaladsl.Keep;
19+
import scala.concurrent.Future;
20+
import scala.runtime.BoxedUnit;
21+
22+
public interface ComplexCommand {
23+
24+
public class DataElement {
25+
public final int value;
26+
27+
public DataElement(int value) {
28+
this.value = value;
29+
}
30+
}
31+
public interface PartialResult {}
32+
33+
public interface Result {}
34+
35+
public class PartSuccess implements PartialResult {
36+
public final int value;
37+
38+
public PartSuccess(int value) {
39+
this.value = value;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "ResultSuccess(" + value + ")";
45+
}
46+
}
47+
48+
public class PartFailure implements PartialResult {
49+
public final Throwable failure;
50+
51+
public PartFailure(Throwable failure) {
52+
this.failure = failure;
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return "ResultFailure(" + failure.getMessage() + ")";
58+
}
59+
}
60+
61+
public interface ProcessingLogic {
62+
public PartialResult process(Stream<DataElement> input);
63+
}
64+
65+
public interface MergeLogic {
66+
public Result merge(Collection<PartialResult> partialResults);
67+
}
68+
69+
public class BatchJob {
70+
public final String dataSelector;
71+
public final ProcessingLogic processingLogic;
72+
public final MergeLogic mergeLogic;
73+
74+
public BatchJob(String dataSelector, ProcessingLogic processingLogic, MergeLogic mergeLogic) {
75+
this.dataSelector = dataSelector;
76+
this.processingLogic = processingLogic;
77+
this.mergeLogic = mergeLogic;
78+
}
79+
80+
public BatchJob withDataSelector(String selector) {
81+
return new BatchJob(selector, processingLogic, mergeLogic);
82+
}
83+
}
84+
85+
public class BatchJobJS {
86+
public final String dataSelector;
87+
public final String processingLogic;
88+
public final String mergeLogic;
89+
90+
public BatchJobJS(String dataSelector, String processingLogic, String mergeLogic) {
91+
this.dataSelector = dataSelector;
92+
this.processingLogic = processingLogic;
93+
this.mergeLogic = mergeLogic;
94+
}
95+
96+
public BatchJobJS withDataSelector(String selector) {
97+
return new BatchJobJS(selector, processingLogic, mergeLogic);
98+
}
99+
}
100+
101+
public class WorkerJS {
102+
public PartialResult runJob(BatchJobJS job) {
103+
ScriptEngine engine = new ScriptEngineManager().getEngineByName("nashorn");
104+
Invocable invocable = (Invocable) engine;
105+
try {
106+
engine.eval(job.processingLogic);
107+
final Stream<DataElement> input = provideData(job.dataSelector);
108+
PartialResult result = (PartialResult) invocable.invokeFunction("process", input);
109+
return result;
110+
} catch (Exception e) {
111+
return new PartFailure(e);
112+
}
113+
}
114+
private Stream<DataElement> provideData(String selector) {
115+
/* fetch data from persistent storage in streaming fashion */
116+
return Stream.of(1, 2, 3).map(DataElement::new);
117+
}
118+
}
119+
120+
class InRange implements Predicate<DataElement> {
121+
private static final long serialVersionUID = 1L;
122+
public final String fieldname;
123+
public final Number min;
124+
public final Number max;
125+
public InRange(String fieldname, Number min, Number max) {
126+
this.fieldname = fieldname;
127+
this.min = min;
128+
this.max = max;
129+
}
130+
@Override
131+
public boolean test(DataElement arg0) {
132+
// TODO Auto-generated method stub
133+
return false;
134+
}
135+
}
136+
137+
class Median<T> implements Function2<T, DataElement, T> {
138+
private static final long serialVersionUID = 1L;
139+
public final String fieldname;
140+
141+
public Median(String fieldname) {
142+
this.fieldname = fieldname;
143+
}
144+
145+
@Override
146+
public T apply(T arg0, DataElement arg1) throws Exception {
147+
// TODO Auto-generated method stub
148+
return null;
149+
}
150+
}
151+
152+
class Inject<T> implements Function<DataElement, DataElement> {
153+
private static final long serialVersionUID = 1L;
154+
public final RunnableGraph<Future<T>> value;
155+
public final String fieldname;
156+
157+
public Inject(RunnableGraph<Future<T>> value, String fieldname) {
158+
this.value = value;
159+
this.fieldname = fieldname;
160+
}
161+
162+
@Override
163+
public DataElement apply(DataElement arg0) throws Exception {
164+
// TODO Auto-generated method stub
165+
return null;
166+
}
167+
}
168+
169+
class Filter implements Predicate<DataElement> {
170+
private static final long serialVersionUID = 1L;
171+
public final String expression;
172+
173+
public Filter(String expression) {
174+
this.expression = expression;
175+
}
176+
177+
@Override
178+
public boolean test(DataElement arg0) {
179+
// TODO Auto-generated method stub
180+
return false;
181+
}
182+
}
183+
184+
class DistinctValues<T> implements Function2<Set<T>, DataElement, Set<T>> {
185+
private static final long serialVersionUID = 1L;
186+
public final String[] fields;
187+
188+
public DistinctValues(String... fields) {
189+
this.fields = fields;
190+
}
191+
192+
@Override
193+
public Set<T> apply(Set<T> arg0, DataElement arg1) throws Exception {
194+
// TODO Auto-generated method stub
195+
return null;
196+
}
197+
198+
}
199+
200+
public static void akkaStreamDSL() {
201+
RunnableGraph<Future<Long>> p =
202+
Source.<DataElement>empty()
203+
.filter(new InRange("year", 1950, 1960))
204+
.toMat(Sink.fold(0L, new Median<Long>("price")), Keep.<BoxedUnit, Long>right());
205+
206+
Source.<DataElement>empty()
207+
.map(new Inject<Long>(p, "p"))
208+
.filter(new Filter("price > p"))
209+
.to(Sink.fold(Collections.emptySet(), new DistinctValues<Pair<String, String>>("make", "model")));
210+
}
211+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.reactivedesignpatterns.chapter13;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.Scanner;
6+
7+
import com.reactivedesignpatterns.chapter13.ComplexCommand.*;
8+
9+
public class ComplexCommandTest {
10+
public static void main(String[] args) throws IOException {
11+
try (InputStream js = ComplexCommandTest.class.getResourceAsStream("/com/reactivedesignpatterns/chapter13/job.js");
12+
Scanner s = new Scanner(js, "UTF-8")) {
13+
s.useDelimiter("\\A");
14+
final BatchJobJS job = new BatchJobJS("", s.next(), "");
15+
final WorkerJS worker = new WorkerJS();
16+
final PartialResult result = worker.runJob(job);
17+
System.out.println(result);
18+
}
19+
}
20+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.reactivedesignpatterns.chapter13;
2+
3+
import java.sql.Connection;
4+
import java.sql.ResultSet;
5+
import java.util.LinkedList;
6+
import java.util.List;
7+
import java.util.concurrent.Executor;
8+
import java.util.concurrent.LinkedBlockingDeque;
9+
import java.util.concurrent.RejectedExecutionException;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
12+
import static java.util.concurrent.TimeUnit.SECONDS;
13+
14+
import javax.sql.DataSource;
15+
16+
import akka.actor.AbstractActor;
17+
import akka.actor.ActorRef;
18+
import akka.japi.pf.ReceiveBuilder;
19+
20+
public interface ManagedBlocking {
21+
22+
public enum AccessRights {
23+
READ_JOB_STATUS,
24+
SUBMIT_JOB;
25+
26+
public static final AccessRights[] EMPTY = new AccessRights[] {};
27+
}
28+
29+
public class CheckAccess {
30+
public final String username;
31+
public final String credentials;
32+
public final AccessRights[] rights;
33+
public final ActorRef replyTo;
34+
35+
public CheckAccess(String username, String credentials, AccessRights[] rights, ActorRef replyTo) {
36+
this.username = username;
37+
this.credentials = credentials;
38+
this.rights = rights;
39+
this.replyTo = replyTo;
40+
}
41+
}
42+
43+
public class CheckAccessResult {
44+
public final String username;
45+
public final String credentials;
46+
public final AccessRights[] rights;
47+
48+
public CheckAccessResult(String username, String credentials, AccessRights[] rights) {
49+
this.username = username;
50+
this.credentials = credentials;
51+
this.rights = rights;
52+
}
53+
}
54+
55+
public class AccessService extends AbstractActor {
56+
private Executor pool;
57+
58+
public AccessService(DataSource db, int poolSize, int queueSize) {
59+
pool = new ThreadPoolExecutor(0, poolSize, 60, SECONDS, new LinkedBlockingDeque<>(queueSize));
60+
61+
final ActorRef self = self();
62+
receive(ReceiveBuilder
63+
.match(CheckAccess.class, ca -> {
64+
try {
65+
pool.execute(() -> checkAccess(db, ca, self));
66+
} catch (RejectedExecutionException e) {
67+
ca.replyTo.tell(new CheckAccessResult(ca.username, ca.credentials, AccessRights.EMPTY), self);
68+
}})
69+
.build());
70+
}
71+
72+
private static void checkAccess(DataSource db, CheckAccess ca, ActorRef self) {
73+
try {
74+
final Connection conn = db.getConnection();
75+
final ResultSet result = conn.createStatement().executeQuery("<figure out access rights>");
76+
final List<AccessRights> rights = new LinkedList<>();
77+
while (result.next()) {
78+
rights.add(AccessRights.valueOf(result.getString(0)));
79+
}
80+
ca.replyTo.tell(new CheckAccessResult(ca.username, ca.credentials, rights.toArray(AccessRights.EMPTY)), self);
81+
} catch (Exception e) {
82+
ca.replyTo.tell(new CheckAccessResult(ca.username, ca.credentials, AccessRights.EMPTY), self);
83+
}
84+
}
85+
}
86+
87+
}

0 commit comments

Comments
 (0)