Skip to content

Commit 58ff29e

Browse files
authored
[Failure store - selector syntax] Introduce the ::* selector (#115389) (#115507)
**Introduction** > In order to make adoption of failure stores simpler for all users, we are introducing a new syntactical feature to index expression resolution: The selector. > > Selectors, denoted with a :: followed by a recognized suffix will allow users to specify which component of an index abstraction they would like to operate on within an API call. In this case, an index abstraction is a concrete index, data stream, or alias; Any abstraction that can be resolved to a set of indices/shards. We define a component of an index abstraction to be some searchable unit of the index abstraction. > > To start, we will support two components: data and failures. Concrete indices are their own data components, while the data component for index aliases are all of the indices contained therein. For data streams, the data component corresponds to their backing indices. Data stream aliases mirror this, treating all backing indices of the data streams they correspond to as their data component. > > The failure component is only supported by data streams and data stream aliases. The failure component of these abstractions refer to the data streams' failure stores. Indices and index aliases do not have a failure component. For more details and examples see #113144. All this work has been cherry picked from there. **Purpose of this PR** This PR is introducing the `::*` as another selector option and not as a combination of `::data` and `::failure`. The reason for this change is that we need to differentiate between: - `my-index::*` which should resolve to `my-index::data` only and not to `my-index::failures` and - a user explicitly requesting `my-index::data, my-index::failures` which should result potentially to an error.
1 parent 4a20067 commit 58ff29e

File tree

21 files changed

+197
-146
lines changed

21 files changed

+197
-146
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void setup() throws Exception {
132132
// Initialize the failure store.
133133
RolloverRequest rolloverRequest = new RolloverRequest("with-fs", null);
134134
rolloverRequest.setIndicesOptions(
135-
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES).build()
135+
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
136136
);
137137
response = client.execute(RolloverAction.INSTANCE, rolloverRequest).get();
138138
assertTrue(response.isAcknowledged());

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void testRejectionFromFailureStore() throws IOException {
198198
// Initialize failure store.
199199
var rolloverRequest = new RolloverRequest(dataStream, null);
200200
rolloverRequest.setIndicesOptions(
201-
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES).build()
201+
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
202202
);
203203
var rolloverResponse = client().execute(RolloverAction.INSTANCE, rolloverRequest).actionGet();
204204
var failureStoreIndex = rolloverResponse.getNewIndex();

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ private Set<Index> maybeExecuteForceMerge(ClusterState state, List<Index> indice
946946
UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
947947
updateMergePolicySettingsRequest.indicesOptions(
948948
IndicesOptions.builder(updateMergePolicySettingsRequest.indicesOptions())
949-
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
949+
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
950950
.build()
951951
);
952952
updateMergePolicySettingsRequest.indices(indexName);
@@ -1408,9 +1408,7 @@ static RolloverRequest getDefaultRolloverRequest(
14081408
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
14091409
if (rolloverFailureStore) {
14101410
rolloverRequest.setIndicesOptions(
1411-
IndicesOptions.builder(rolloverRequest.indicesOptions())
1412-
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
1413-
.build()
1411+
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
14141412
);
14151413
}
14161414
rolloverRequest.setConditions(rolloverConfiguration.resolveRolloverConditions(dataRetention));

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,11 @@ public void testOperationsExecutedOnce() {
225225
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
226226
RolloverRequest rolloverBackingIndexRequest = (RolloverRequest) clientSeenRequests.get(0);
227227
assertThat(rolloverBackingIndexRequest.getRolloverTarget(), is(dataStreamName));
228-
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_DATA));
228+
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.DATA));
229229
assertThat(clientSeenRequests.get(1), instanceOf(RolloverRequest.class));
230230
RolloverRequest rolloverFailureIndexRequest = (RolloverRequest) clientSeenRequests.get(1);
231231
assertThat(rolloverFailureIndexRequest.getRolloverTarget(), is(dataStreamName));
232-
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_FAILURES));
232+
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.FAILURES));
233233
List<DeleteIndexRequest> deleteRequests = clientSeenRequests.subList(2, 5)
234234
.stream()
235235
.map(transportRequest -> (DeleteIndexRequest) transportRequest)
@@ -1543,11 +1543,11 @@ public void testFailureStoreIsManagedEvenWhenDisabled() {
15431543
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
15441544
RolloverRequest rolloverBackingIndexRequest = (RolloverRequest) clientSeenRequests.get(0);
15451545
assertThat(rolloverBackingIndexRequest.getRolloverTarget(), is(dataStreamName));
1546-
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_DATA));
1546+
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.DATA));
15471547
assertThat(clientSeenRequests.get(1), instanceOf(RolloverRequest.class));
15481548
RolloverRequest rolloverFailureIndexRequest = (RolloverRequest) clientSeenRequests.get(1);
15491549
assertThat(rolloverFailureIndexRequest.getRolloverTarget(), is(dataStreamName));
1550-
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_FAILURES));
1550+
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.FAILURES));
15511551
assertThat(
15521552
((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0],
15531553
is(dataStream.getFailureIndices().getIndices().get(0).getName())

server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ static TransportVersion def(int id) {
180180
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
181181
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ = def(8_776_00_0);
182182
public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0);
183+
public static final TransportVersion INTRODUCE_ALL_APPLICABLE_SELECTOR = def(8_778_00_0);
183184

184185
/*
185186
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public GetIndexRequest() {
9898
super(
9999
DataStream.isFailureStoreFeatureFlagEnabled()
100100
? IndicesOptions.builder(IndicesOptions.strictExpandOpen())
101-
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
101+
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
102102
.build()
103103
: IndicesOptions.strictExpandOpen()
104104
);

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.IndicesRequest;
1414
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1515
import org.elasticsearch.action.support.ActiveShardCount;
16+
import org.elasticsearch.action.support.IndexComponentSelector;
1617
import org.elasticsearch.action.support.IndicesOptions;
1718
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1819
import org.elasticsearch.cluster.metadata.DataStream;
@@ -147,8 +148,8 @@ public ActionRequestValidationException validate() {
147148
);
148149
}
149150

150-
var selectors = indicesOptions.selectorOptions().defaultSelectors();
151-
if (selectors.size() > 1) {
151+
var selector = indicesOptions.selectorOptions().defaultSelector();
152+
if (selector == IndexComponentSelector.ALL_APPLICABLE) {
152153
validationException = addValidationError(
153154
"rollover cannot be applied to both regular and failure indices at the same time",
154155
validationException

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private void rollOverFailureStores(Runnable runnable) {
212212
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null);
213213
rolloverRequest.setIndicesOptions(
214214
IndicesOptions.builder(rolloverRequest.indicesOptions())
215-
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
215+
.selectorOptions(IndicesOptions.SelectorOptions.FAILURES)
216216
.build()
217217
);
218218
// We are executing a lazy rollover because it is an action specialised for this situation, when we want an

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ private void rollOverDataStreams(
418418
if (targetFailureStore) {
419419
rolloverRequest.setIndicesOptions(
420420
IndicesOptions.builder(rolloverRequest.indicesOptions())
421-
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
421+
.selectorOptions(IndicesOptions.SelectorOptions.FAILURES)
422422
.build()
423423
);
424424
}

server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsStatsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public Request() {
6161
.allowFailureIndices(true)
6262
.build()
6363
)
64-
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
64+
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
6565
.build()
6666
);
6767
}

server/src/main/java/org/elasticsearch/action/support/IndexComponentSelector.java

+64-9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
package org.elasticsearch.action.support;
1111

12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.core.Nullable;
16+
17+
import java.io.IOException;
1218
import java.util.Collections;
1319
import java.util.HashMap;
1420
import java.util.Map;
@@ -17,33 +23,82 @@
1723
* We define as index components the two different sets of indices a data stream could consist of:
1824
* - DATA: represents the backing indices
1925
* - FAILURES: represent the failing indices
26+
* - ALL: represents all available in this expression components, meaning if it's a data stream both backing and failure indices and if it's
27+
* an index only the index itself.
2028
* Note: An index is its own DATA component, but it cannot have a FAILURE component.
2129
*/
22-
public enum IndexComponentSelector {
23-
DATA("data"),
24-
FAILURES("failures");
30+
public enum IndexComponentSelector implements Writeable {
31+
DATA("data", (byte) 0),
32+
FAILURES("failures", (byte) 1),
33+
ALL_APPLICABLE("*", (byte) 2);
2534

2635
private final String key;
36+
private final byte id;
2737

28-
IndexComponentSelector(String key) {
38+
IndexComponentSelector(String key, byte id) {
2939
this.key = key;
40+
this.id = id;
3041
}
3142

3243
public String getKey() {
3344
return key;
3445
}
3546

36-
private static final Map<String, IndexComponentSelector> REGISTRY;
47+
public byte getId() {
48+
return id;
49+
}
50+
51+
private static final Map<String, IndexComponentSelector> KEY_REGISTRY;
52+
private static final Map<Byte, IndexComponentSelector> ID_REGISTRY;
3753

3854
static {
39-
Map<String, IndexComponentSelector> registry = new HashMap<>(IndexComponentSelector.values().length);
55+
Map<String, IndexComponentSelector> keyRegistry = new HashMap<>(IndexComponentSelector.values().length);
56+
for (IndexComponentSelector value : IndexComponentSelector.values()) {
57+
keyRegistry.put(value.getKey(), value);
58+
}
59+
KEY_REGISTRY = Collections.unmodifiableMap(keyRegistry);
60+
Map<Byte, IndexComponentSelector> idRegistry = new HashMap<>(IndexComponentSelector.values().length);
4061
for (IndexComponentSelector value : IndexComponentSelector.values()) {
41-
registry.put(value.getKey(), value);
62+
idRegistry.put(value.getId(), value);
4263
}
43-
REGISTRY = Collections.unmodifiableMap(registry);
64+
ID_REGISTRY = Collections.unmodifiableMap(idRegistry);
4465
}
4566

67+
/**
68+
* Retrieves the respective selector when the suffix key is recognised
69+
* @param key the suffix key, probably parsed from an expression
70+
* @return the selector or null if the key was not recognised.
71+
*/
72+
@Nullable
4673
public static IndexComponentSelector getByKey(String key) {
47-
return REGISTRY.get(key);
74+
return KEY_REGISTRY.get(key);
75+
}
76+
77+
public static IndexComponentSelector read(StreamInput in) throws IOException {
78+
return getById(in.readByte());
79+
}
80+
81+
// Visible for testing
82+
static IndexComponentSelector getById(byte id) {
83+
IndexComponentSelector indexComponentSelector = ID_REGISTRY.get(id);
84+
if (indexComponentSelector == null) {
85+
throw new IllegalArgumentException(
86+
"Unknown id of index component selector [" + id + "], available options are: " + ID_REGISTRY
87+
);
88+
}
89+
return indexComponentSelector;
90+
}
91+
92+
@Override
93+
public void writeTo(StreamOutput out) throws IOException {
94+
out.writeByte(id);
95+
}
96+
97+
public boolean shouldIncludeData() {
98+
return this == ALL_APPLICABLE || this == DATA;
99+
}
100+
101+
public boolean shouldIncludeFailures() {
102+
return this == ALL_APPLICABLE || this == FAILURES;
48103
}
49104
}

0 commit comments

Comments
 (0)