18
18
import org .elasticsearch .action .datastreams .GetDataStreamAction ;
19
19
import org .elasticsearch .action .index .IndexRequest ;
20
20
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
21
+ import org .elasticsearch .cluster .ClusterState ;
22
+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
21
23
import org .elasticsearch .cluster .metadata .ComposableIndexTemplate ;
22
24
import org .elasticsearch .cluster .metadata .DataStream ;
25
+ import org .elasticsearch .cluster .service .ClusterService ;
23
26
import org .elasticsearch .plugins .Plugin ;
24
27
import org .elasticsearch .test .ESIntegTestCase ;
25
- import org .elasticsearch .test .disruption .IntermittentLongGCDisruption ;
26
- import org .elasticsearch .test .disruption .SingleNodeDisruption ;
27
28
import org .elasticsearch .xcontent .XContentType ;
28
29
29
30
import java .util .Collection ;
30
31
import java .util .List ;
31
32
import java .util .concurrent .CountDownLatch ;
33
+ import java .util .concurrent .CyclicBarrier ;
32
34
import java .util .concurrent .ExecutionException ;
33
35
34
36
import static org .hamcrest .Matchers .equalTo ;
@@ -43,25 +45,38 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
43
45
}
44
46
45
47
public void testRolloverIsExecutedOnce () throws ExecutionException , InterruptedException {
46
- String masterNode = internalCluster ().startMasterOnlyNode ();
48
+ internalCluster ().startMasterOnlyNode ();
47
49
internalCluster ().startDataOnlyNodes (3 );
48
50
ensureStableCluster (4 );
49
51
50
52
String dataStreamName = "my-data-stream" ;
51
53
createDataStream (dataStreamName );
52
54
53
55
// Mark it to lazy rollover
54
- new RolloverRequestBuilder (client ()).setRolloverTarget (dataStreamName ).lazy (true ).execute (). get ( );
56
+ safeGet ( new RolloverRequestBuilder (client ()).setRolloverTarget (dataStreamName ).lazy (true ).execute ());
55
57
56
58
// Verify that the data stream is marked for rollover and that it has currently one index
57
59
DataStream dataStream = getDataStream (dataStreamName );
58
60
assertThat (dataStream .rolloverOnWrite (), equalTo (true ));
59
61
assertThat (dataStream .getBackingIndices ().getIndices ().size (), equalTo (1 ));
60
62
61
63
// Introduce a disruption to the master node that should delay the rollover execution
62
- SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption (random (), masterNode , 100 , 200 , 30000 , 60000 );
63
- internalCluster ().setDisruptionScheme (masterNodeDisruption );
64
- masterNodeDisruption .startDisrupting ();
64
+ final var barrier = new CyclicBarrier (2 );
65
+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
66
+ .submitUnbatchedStateUpdateTask ("block" , new ClusterStateUpdateTask () {
67
+ @ Override
68
+ public ClusterState execute (ClusterState currentState ) {
69
+ safeAwait (barrier );
70
+ safeAwait (barrier );
71
+ return currentState ;
72
+ }
73
+
74
+ @ Override
75
+ public void onFailure (Exception e ) {
76
+ fail (e );
77
+ }
78
+ });
79
+ safeAwait (barrier );
65
80
66
81
// Start indexing operations
67
82
int docs = randomIntBetween (5 , 10 );
@@ -84,10 +99,10 @@ public void onFailure(Exception e) {
84
99
}
85
100
86
101
// End the disruption so that all pending tasks will complete
87
- masterNodeDisruption . stopDisrupting ( );
102
+ safeAwait ( barrier );
88
103
89
104
// Wait for all the indexing requests to be processed successfully
90
- countDownLatch . await ( );
105
+ safeAwait ( countDownLatch );
91
106
92
107
// Verify that the rollover has happened once
93
108
dataStream = getDataStream (dataStreamName );
@@ -96,10 +111,12 @@ public void onFailure(Exception e) {
96
111
}
97
112
98
113
private DataStream getDataStream (String dataStreamName ) {
99
- return client ().execute (
100
- GetDataStreamAction .INSTANCE ,
101
- new GetDataStreamAction .Request (TEST_REQUEST_TIMEOUT , new String [] { dataStreamName })
102
- ).actionGet ().getDataStreams ().get (0 ).getDataStream ();
114
+ return safeGet (
115
+ client ().execute (
116
+ GetDataStreamAction .INSTANCE ,
117
+ new GetDataStreamAction .Request (TEST_REQUEST_TIMEOUT , new String [] { dataStreamName })
118
+ )
119
+ ).getDataStreams ().get (0 ).getDataStream ();
103
120
}
104
121
105
122
private void createDataStream (String dataStreamName ) throws InterruptedException , ExecutionException {
@@ -111,19 +128,19 @@ private void createDataStream(String dataStreamName) throws InterruptedException
111
128
.dataStreamTemplate (new ComposableIndexTemplate .DataStreamTemplate (false , false ))
112
129
.build ()
113
130
);
114
- final AcknowledgedResponse putComposableTemplateResponse = client ().execute (
115
- TransportPutComposableIndexTemplateAction .TYPE ,
116
- putComposableTemplateRequest
117
- ).actionGet ();
131
+ final AcknowledgedResponse putComposableTemplateResponse = safeGet (
132
+ client ().execute (TransportPutComposableIndexTemplateAction .TYPE , putComposableTemplateRequest )
133
+ );
118
134
assertThat (putComposableTemplateResponse .isAcknowledged (), is (true ));
119
135
120
136
final CreateDataStreamAction .Request createDataStreamRequest = new CreateDataStreamAction .Request (
121
137
TEST_REQUEST_TIMEOUT ,
122
138
TEST_REQUEST_TIMEOUT ,
123
139
dataStreamName
124
140
);
125
- final AcknowledgedResponse createDataStreamResponse = client ().execute (CreateDataStreamAction .INSTANCE , createDataStreamRequest )
126
- .get ();
141
+ final AcknowledgedResponse createDataStreamResponse = safeGet (
142
+ client ().execute (CreateDataStreamAction .INSTANCE , createDataStreamRequest )
143
+ );
127
144
assertThat (createDataStreamResponse .isAcknowledged (), is (true ));
128
145
}
129
146
}
0 commit comments