Skip to content

Commit 7044165

Browse files
authored
before bulk synch, unit test fix (#837)
1 parent 71d6401 commit 7044165

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,8 @@ public void flush() {
293293

294294
if (listener != null) {
295295
BulkRequest finalRequest = request;
296-
scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts));
296+
// synchronous execution to make sure it actually runs before
297+
listener.beforeBulk(id, finalRequest, requestContexts);
297298
}
298299

299300
CompletionStage<BulkResponse> result = client.bulk(request);

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.CountDownLatch;
4848
import java.util.concurrent.ExecutorService;
4949
import java.util.concurrent.Executors;
50+
import java.util.concurrent.ScheduledExecutorService;
5051
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.atomic.AtomicInteger;
5253
import java.util.concurrent.atomic.AtomicReference;
@@ -102,11 +103,18 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
102103
CountingListener listener = new CountingListener();
103104
TestTransport transport = new TestTransport();
104105
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
106+
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
107+
Thread t = Executors.defaultThreadFactory().newThread(r);
108+
t.setName("my-bulk-ingester-executor#" );
109+
t.setDaemon(true);
110+
return t;
111+
});
105112

106113
BulkIngester<Void> ingester = BulkIngester.of(b -> b
107114
.client(client)
108115
.maxOperations(maxOperations)
109116
.maxConcurrentRequests(maxRequests)
117+
.scheduler(scheduler)
110118
.listener(listener)
111119
);
112120

@@ -130,6 +138,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
130138

131139
ingester.close();
132140
transport.close();
141+
scheduler.shutdownNow();
133142

134143
printStats(ingester);
135144
printStats(listener);
@@ -181,7 +190,7 @@ public void periodicFlushTest() throws Exception {
181190
// Disable other flushing limits
182191
.maxSize(-1)
183192
.maxOperations(-1)
184-
.maxConcurrentRequests(Integer.MAX_VALUE)
193+
.maxConcurrentRequests(Integer.MAX_VALUE-1)
185194
);
186195

187196
// Add an operation every 100 ms to give time
@@ -242,7 +251,7 @@ public void afterBulk(long executionId, BulkRequest request, List<Void> contexts
242251
// Disable other flushing limits
243252
.maxSize(-1)
244253
.maxOperations(-1)
245-
.maxConcurrentRequests(Integer.MAX_VALUE)
254+
.maxConcurrentRequests(Integer.MAX_VALUE - 1)
246255
.listener(listener)
247256
);
248257

0 commit comments

Comments
 (0)