Skip to content

Commit 5d53c9a

Browse files
authored
Add protection for OOM during aggregations partial reduction (#110520)
This commit adds a check the parent circuit breaker every 1024 call to the buckets consumer during aggregations partial reduction.
1 parent d7d86b4 commit 5d53c9a

File tree

11 files changed

+113
-16
lines changed

11 files changed

+113
-16
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class TermsReduceBenchmark {
7171
private final SearchPhaseController controller = new SearchPhaseController((task, req) -> new AggregationReduceContext.Builder() {
7272
@Override
7373
public AggregationReduceContext forPartialReduction() {
74-
return new AggregationReduceContext.ForPartial(null, null, task, builder);
74+
return new AggregationReduceContext.ForPartial(null, null, task, builder, b -> {});
7575
}
7676

7777
@Override

docs/changelog/110520.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 110520
2+
summary: Add protection for OOM during aggregations partial reduction
3+
area: Aggregations
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/search/SearchService.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -1840,7 +1840,13 @@ public AggregationReduceContext.Builder aggReduceContextBuilder(Supplier<Boolean
18401840
return new AggregationReduceContext.Builder() {
18411841
@Override
18421842
public AggregationReduceContext forPartialReduction() {
1843-
return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled, aggs);
1843+
return new AggregationReduceContext.ForPartial(
1844+
bigArrays,
1845+
scriptService,
1846+
isCanceled,
1847+
aggs,
1848+
multiBucketConsumerService.createForPartial()
1849+
);
18441850
}
18451851

18461852
@Override
@@ -1850,7 +1856,7 @@ public AggregationReduceContext forFinalReduction() {
18501856
scriptService,
18511857
isCanceled,
18521858
aggs,
1853-
multiBucketConsumerService.create()
1859+
multiBucketConsumerService.createForFinal()
18541860
);
18551861
}
18561862
};

server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,28 @@ public final AggregationReduceContext forAgg(String name) {
139139
* A {@linkplain AggregationReduceContext} to perform a partial reduction.
140140
*/
141141
public static final class ForPartial extends AggregationReduceContext {
142+
private final IntConsumer multiBucketConsumer;
143+
142144
public ForPartial(
143145
BigArrays bigArrays,
144146
ScriptService scriptService,
145147
Supplier<Boolean> isCanceled,
146-
AggregatorFactories.Builder builders
148+
AggregatorFactories.Builder builders,
149+
IntConsumer multiBucketConsumer
147150
) {
148151
super(bigArrays, scriptService, isCanceled, builders);
152+
this.multiBucketConsumer = multiBucketConsumer;
149153
}
150154

151-
public ForPartial(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled, AggregationBuilder builder) {
155+
public ForPartial(
156+
BigArrays bigArrays,
157+
ScriptService scriptService,
158+
Supplier<Boolean> isCanceled,
159+
AggregationBuilder builder,
160+
IntConsumer multiBucketConsumer
161+
) {
152162
super(bigArrays, scriptService, isCanceled, builder);
163+
this.multiBucketConsumer = multiBucketConsumer;
153164
}
154165

155166
@Override
@@ -158,7 +169,9 @@ public boolean isFinalReduce() {
158169
}
159170

160171
@Override
161-
protected void consumeBucketCountAndMaybeBreak(int size) {}
172+
protected void consumeBucketCountAndMaybeBreak(int size) {
173+
multiBucketConsumer.accept(size);
174+
}
162175

163176
@Override
164177
public PipelineTree pipelineTreeRoot() {
@@ -167,7 +180,7 @@ public PipelineTree pipelineTreeRoot() {
167180

168181
@Override
169182
protected AggregationReduceContext forSubAgg(AggregationBuilder sub) {
170-
return new ForPartial(bigArrays(), scriptService(), isCanceled(), sub);
183+
return new ForPartial(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer);
171184
}
172185
}
173186

server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,37 @@ public int getCount() {
134134
}
135135
}
136136

137-
public MultiBucketConsumer create() {
137+
/**
138+
* Similar to {@link MultiBucketConsumer} but it only checks the parent circuit breaker every 1024 calls.
139+
* It provides protection for OOM during partial reductions.
140+
*/
141+
private static class MultiBucketConsumerPartialReduction implements IntConsumer {
142+
private final CircuitBreaker breaker;
143+
144+
// aggregations execute in a single thread so no atomic here
145+
private int callCount = 0;
146+
147+
private MultiBucketConsumerPartialReduction(CircuitBreaker breaker) {
148+
this.breaker = breaker;
149+
}
150+
151+
@Override
152+
public void accept(int value) {
153+
// check parent circuit breaker every 1024 calls
154+
if ((++callCount & 0x3FF) == 0) {
155+
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
156+
}
157+
}
158+
}
159+
160+
public IntConsumer createForFinal() {
138161
return new MultiBucketConsumer(maxBucket, breaker);
139162
}
140163

164+
public IntConsumer createForPartial() {
165+
return new MultiBucketConsumerPartialReduction(breaker);
166+
}
167+
141168
public int getLimit() {
142169
return maxBucket;
143170
}

server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,13 @@ public void setup() {
5353
searchPhaseController = new SearchPhaseController((t, s) -> new AggregationReduceContext.Builder() {
5454
@Override
5555
public AggregationReduceContext forPartialReduction() {
56-
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, mock(AggregationBuilder.class));
56+
return new AggregationReduceContext.ForPartial(
57+
BigArrays.NON_RECYCLING_INSTANCE,
58+
null,
59+
t,
60+
mock(AggregationBuilder.class),
61+
b -> {}
62+
);
5763
}
5864

5965
public AggregationReduceContext forFinalReduction() {

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void setup() {
122122
@Override
123123
public AggregationReduceContext forPartialReduction() {
124124
reductions.add(false);
125-
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, agg);
125+
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, agg, b -> {});
126126
}
127127

128128
public AggregationReduceContext forFinalReduction() {

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
import org.elasticsearch.cluster.routing.ShardRouting;
4848
import org.elasticsearch.cluster.routing.ShardRoutingState;
4949
import org.elasticsearch.cluster.routing.TestShardRouting;
50+
import org.elasticsearch.cluster.service.ClusterService;
5051
import org.elasticsearch.common.UUIDs;
52+
import org.elasticsearch.common.breaker.CircuitBreakingException;
53+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
5154
import org.elasticsearch.common.bytes.BytesArray;
5255
import org.elasticsearch.common.bytes.BytesReference;
5356
import org.elasticsearch.common.io.stream.StreamInput;
@@ -151,6 +154,7 @@
151154
import java.util.concurrent.atomic.AtomicInteger;
152155
import java.util.function.Consumer;
153156
import java.util.function.Function;
157+
import java.util.function.IntConsumer;
154158
import java.util.function.Supplier;
155159

156160
import static java.util.Collections.emptyList;
@@ -1985,6 +1989,38 @@ public void testCreateReduceContext() {
19851989
}
19861990
}
19871991

1992+
public void testMultiBucketConsumerServiceCB() {
1993+
MultiBucketConsumerService service = new MultiBucketConsumerService(
1994+
getInstanceFromNode(ClusterService.class),
1995+
Settings.EMPTY,
1996+
new NoopCircuitBreaker("test") {
1997+
1998+
@Override
1999+
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
2000+
throw new CircuitBreakingException("tripped", getDurability());
2001+
}
2002+
}
2003+
);
2004+
// for partial
2005+
{
2006+
IntConsumer consumer = service.createForPartial();
2007+
for (int i = 0; i < 1023; i++) {
2008+
consumer.accept(0);
2009+
}
2010+
CircuitBreakingException ex = expectThrows(CircuitBreakingException.class, () -> consumer.accept(0));
2011+
assertThat(ex.getMessage(), equalTo("tripped"));
2012+
}
2013+
// for final
2014+
{
2015+
IntConsumer consumer = service.createForFinal();
2016+
for (int i = 0; i < 1023; i++) {
2017+
consumer.accept(0);
2018+
}
2019+
CircuitBreakingException ex = expectThrows(CircuitBreakingException.class, () -> consumer.accept(0));
2020+
assertThat(ex.getMessage(), equalTo("tripped"));
2021+
}
2022+
}
2023+
19882024
public void testCreateSearchContext() throws IOException {
19892025
String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
19902026
IndexService indexService = createIndex(index);

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,8 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
644644
bigArraysForReduction,
645645
getMockScriptService(),
646646
() -> false,
647-
builder
647+
builder,
648+
b -> {}
648649
);
649650
AggregatorCollectorManager aggregatorCollectorManager = new AggregatorCollectorManager(
650651
aggregatorSupplier,
@@ -669,7 +670,8 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
669670
bigArraysForReduction,
670671
getMockScriptService(),
671672
() -> false,
672-
builder
673+
builder,
674+
b -> {}
673675
);
674676
internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize));
675677
internalAggs.add(InternalAggregations.topLevelReduce(toReduce, reduceContext));

test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public static AggregationReduceContext.Builder emptyReduceContextBuilder(Aggrega
7777
return new AggregationReduceContext.Builder() {
7878
@Override
7979
public AggregationReduceContext forPartialReduction() {
80-
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, () -> false, aggs);
80+
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, () -> false, aggs, b -> {});
8181
}
8282

8383
@Override
@@ -95,7 +95,7 @@ public static AggregationReduceContext.Builder mockReduceContext(AggregationBuil
9595
return new AggregationReduceContext.Builder() {
9696
@Override
9797
public AggregationReduceContext forPartialReduction() {
98-
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, () -> false, agg);
98+
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, () -> false, agg, b -> {});
9999
}
100100

101101
@Override
@@ -244,7 +244,8 @@ public void testReduceRandom() throws IOException {
244244
bigArrays,
245245
mockScriptService,
246246
() -> false,
247-
inputs.builder()
247+
inputs.builder(),
248+
b -> {}
248249
);
249250
@SuppressWarnings("unchecked")
250251
T reduced = (T) reduce(toPartialReduce, context);

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public AggregationReduceContext forPartialReduction() {
128128
bigArrays,
129129
scriptService,
130130
((CancellableTask) task)::isCancelled,
131-
request.source().aggregations()
131+
request.source().aggregations(),
132+
b -> {}
132133
);
133134
}
134135

0 commit comments

Comments
 (0)