Skip to content

Commit 2a9c4fe

Browse files
authored
Event source started before events are processed (simple/naive syncing approach) (operator-framework#794)
1 parent 870eb7d commit 2a9c4fe

File tree

8 files changed

+162
-50
lines changed

8 files changed

+162
-50
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import java.util.HashMap;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.stream.Collectors;
7+
8+
import static io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState.NO_EVENT_PRESENT;
49

510
/**
611
* Manages the state of received events. Basically there can be only three distinct states relevant
@@ -23,7 +28,7 @@ public enum EventingState {
2328

2429
private EventingState getEventingState(ResourceID resourceID) {
2530
EventingState actualState = eventingState.get(resourceID);
26-
return actualState == null ? EventingState.NO_EVENT_PRESENT : actualState;
31+
return actualState == null ? NO_EVENT_PRESENT : actualState;
2732
}
2833

2934
private void setEventingState(ResourceID resourceID, EventingState state) {
@@ -46,7 +51,7 @@ public void unMarkEventReceived(ResourceID resourceID) {
4651
switch (actualState) {
4752
case EVENT_PRESENT:
4853
setEventingState(resourceID,
49-
EventingState.NO_EVENT_PRESENT);
54+
NO_EVENT_PRESENT);
5055
break;
5156
case DELETE_EVENT_PRESENT:
5257
throw new IllegalStateException("Cannot unmark delete event.");
@@ -72,10 +77,18 @@ public boolean eventPresent(ResourceID resourceID) {
7277

7378
public boolean noEventPresent(ResourceID resourceID) {
7479
var actualState = getEventingState(resourceID);
75-
return actualState == EventingState.NO_EVENT_PRESENT;
80+
return actualState == NO_EVENT_PRESENT;
7681
}
7782

7883
public void cleanup(ResourceID resourceID) {
7984
eventingState.remove(resourceID);
8085
}
86+
87+
public List<ResourceID> resourceIDsWithEventPresent() {
88+
return eventingState.entrySet().stream()
89+
.filter(e -> e.getValue() != NO_EVENT_PRESENT)
90+
.map(Map.Entry::getKey)
91+
.collect(Collectors.toList());
92+
}
93+
8194
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 73 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,38 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
5858
eventSourceManager.getController().getConfiguration().getRetryConfiguration()),
5959
eventSourceManager.getController().getConfiguration().getConfigurationService() == null
6060
? Metrics.NOOP
61-
: eventSourceManager.getController().getConfiguration().getConfigurationService()
61+
: eventSourceManager
62+
.getController()
63+
.getConfiguration()
64+
.getConfigurationService()
6265
.getMetrics(),
6366
eventSourceManager);
6467
}
6568

66-
EventProcessor(ReconciliationDispatcher<R> reconciliationDispatcher,
69+
EventProcessor(
70+
ReconciliationDispatcher<R> reconciliationDispatcher,
6771
EventSourceManager<R> eventSourceManager,
6872
String relatedControllerName,
6973
Retry retry) {
70-
this(eventSourceManager.getControllerResourceEventSource().getResourceCache(), null,
74+
this(
75+
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
76+
null,
7177
relatedControllerName,
72-
reconciliationDispatcher, retry, null, eventSourceManager);
78+
reconciliationDispatcher,
79+
retry,
80+
null,
81+
eventSourceManager);
7382
}
7483

75-
private EventProcessor(Cache<R> cache, ExecutorService executor,
84+
private EventProcessor(
85+
Cache<R> cache,
86+
ExecutorService executor,
7687
String relatedControllerName,
77-
ReconciliationDispatcher<R> reconciliationDispatcher, Retry retry, Metrics metrics,
88+
ReconciliationDispatcher<R> reconciliationDispatcher,
89+
Retry retry,
90+
Metrics metrics,
7891
EventSourceManager<R> eventSourceManager) {
79-
this.running = true;
92+
this.running = false;
8093
this.executor =
8194
executor == null
8295
? new ScheduledThreadPoolExecutor(
@@ -99,26 +112,31 @@ public void handleEvent(Event event) {
99112
lock.lock();
100113
try {
101114
log.debug("Received event: {}", event);
102-
if (!this.running) {
103-
log.debug("Skipping event: {} because the event handler is not started", event);
104-
return;
105-
}
115+
106116
final var resourceID = event.getRelatedCustomResourceID();
107117
MDCUtils.addResourceIDInfo(resourceID);
108118
metrics.receivedEvent(event);
109-
110119
handleEventMarking(event);
111-
if (!eventMarker.deleteEventPresent(resourceID)) {
112-
submitReconciliationExecution(resourceID);
113-
} else {
114-
cleanupForDeletedEvent(resourceID);
120+
if (!this.running) {
121+
// events are received and marked, but will be processed when started, see start() method.
122+
log.debug("Skipping event: {} because the event processor is not started", event);
123+
return;
115124
}
125+
handleMarkedEventForResource(resourceID);
116126
} finally {
117127
lock.unlock();
118128
MDCUtils.removeResourceIDInfo();
119129
}
120130
}
121131

132+
private void handleMarkedEventForResource(ResourceID resourceID) {
133+
if (!eventMarker.deleteEventPresent(resourceID)) {
134+
submitReconciliationExecution(resourceID);
135+
} else {
136+
cleanupForDeletedEvent(resourceID);
137+
}
138+
}
139+
122140
private void submitReconciliationExecution(ResourceID resourceID) {
123141
try {
124142
boolean controllerUnderExecution = isControllerUnderExecution(resourceID);
@@ -148,8 +166,8 @@ private void submitReconciliationExecution(ResourceID resourceID) {
148166
}
149167

150168
private void handleEventMarking(Event event) {
151-
if (event instanceof ResourceEvent &&
152-
((ResourceEvent) event).getAction() == ResourceAction.DELETED) {
169+
if (event instanceof ResourceEvent
170+
&& ((ResourceEvent) event).getAction() == ResourceAction.DELETED) {
153171
eventMarker.markDeleteEventReceived(event);
154172
} else if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) {
155173
eventMarker.markEventReceived(event);
@@ -177,10 +195,11 @@ void eventProcessingFinished(
177195
// If a delete event present at this phase, it was received during reconciliation.
178196
// So we either removed the finalizer during reconciliation or we don't use finalizers.
179197
// Either way we don't want to retry.
180-
if (isRetryConfigured() && postExecutionControl.exceptionDuringExecution() &&
181-
!eventMarker.deleteEventPresent(resourceID)) {
182-
handleRetryOnException(executionScope,
183-
postExecutionControl.getRuntimeException().orElseThrow());
198+
if (isRetryConfigured()
199+
&& postExecutionControl.exceptionDuringExecution()
200+
&& !eventMarker.deleteEventPresent(resourceID)) {
201+
handleRetryOnException(
202+
executionScope, postExecutionControl.getRuntimeException().orElseThrow());
184203
return;
185204
}
186205
cleanupOnSuccessfulExecution(executionScope);
@@ -195,8 +214,7 @@ void eventProcessingFinished(
195214
postponeReconciliationAndHandleCacheSyncEvent(resourceID);
196215
}
197216
} else {
198-
reScheduleExecutionIfInstructed(postExecutionControl,
199-
executionScope.getResource());
217+
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
200218
}
201219
}
202220
} finally {
@@ -208,20 +226,26 @@ private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID
208226
eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID);
209227
}
210228

211-
private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> executionScope,
212-
PostExecutionControl<R> postExecutionControl) {
229+
private boolean isCacheReadyForInstantReconciliation(
230+
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
213231
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
214232
return true;
215233
}
216234
String originalResourceVersion = getVersion(executionScope.getResource());
217-
String customResourceVersionAfterExecution = getVersion(postExecutionControl
218-
.getUpdatedCustomResource()
219-
.orElseThrow(() -> new IllegalStateException(
220-
"Updated custom resource must be present at this point of time")));
221-
String cachedCustomResourceVersion = getVersion(cache
222-
.get(executionScope.getCustomResourceID())
223-
.orElseThrow(() -> new IllegalStateException(
224-
"Cached custom resource must be present at this point")));
235+
String customResourceVersionAfterExecution =
236+
getVersion(
237+
postExecutionControl
238+
.getUpdatedCustomResource()
239+
.orElseThrow(
240+
() -> new IllegalStateException(
241+
"Updated custom resource must be present at this point of time")));
242+
String cachedCustomResourceVersion =
243+
getVersion(
244+
cache
245+
.get(executionScope.getCustomResourceID())
246+
.orElseThrow(
247+
() -> new IllegalStateException(
248+
"Cached custom resource must be present at this point")));
225249

226250
if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
227251
return true;
@@ -233,9 +257,10 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution
233257
return !cachedCustomResourceVersion.equals(originalResourceVersion);
234258
}
235259

236-
private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
237-
R customResource) {
238-
postExecutionControl.getReScheduleDelay()
260+
private void reScheduleExecutionIfInstructed(
261+
PostExecutionControl<R> postExecutionControl, R customResource) {
262+
postExecutionControl
263+
.getReScheduleDelay()
239264
.ifPresent(delay -> retryEventSource().scheduleOnce(customResource, delay));
240265
}
241266

@@ -248,16 +273,15 @@ TimerEventSource<R> retryEventSource() {
248273
* events (received meanwhile retry is in place or already in buffer) instantly or always wait
249274
* according to the retry timing if there was an exception.
250275
*/
251-
private void handleRetryOnException(ExecutionScope<R> executionScope,
252-
RuntimeException exception) {
276+
private void handleRetryOnException(
277+
ExecutionScope<R> executionScope, RuntimeException exception) {
253278
RetryExecution execution = getOrInitRetryExecution(executionScope);
254279
var customResourceID = executionScope.getCustomResourceID();
255280
boolean eventPresent = eventMarker.eventPresent(customResourceID);
256281
eventMarker.markEventReceived(customResourceID);
257282

258283
if (eventPresent) {
259-
log.debug("New events exists for for resource id: {}",
260-
customResourceID);
284+
log.debug("New events exists for for resource id: {}", customResourceID);
261285
submitReconciliationExecution(customResourceID);
262286
return;
263287
}
@@ -277,8 +301,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
277301

278302
private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
279303
log.debug(
280-
"Cleanup for successful execution for resource: {}",
281-
getName(executionScope.getResource()));
304+
"Cleanup for successful execution for resource: {}", getName(executionScope.getResource()));
282305
if (isRetryConfigured()) {
283306
retryState.remove(executionScope.getCustomResourceID());
284307
}
@@ -330,11 +353,18 @@ public void start() throws OperatorException {
330353
lock.lock();
331354
try {
332355
this.running = true;
356+
handleAlreadyMarkedEvents();
333357
} finally {
334358
lock.unlock();
335359
}
336360
}
337361

362+
private void handleAlreadyMarkedEvents() {
363+
for (ResourceID resourceID : eventMarker.resourceIDsWithEventPresent()) {
364+
handleMarkedEventForResource(resourceID);
365+
}
366+
}
367+
338368
private class ControllerExecution implements Runnable {
339369
private final ExecutionScope<R> executionScope;
340370

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,18 @@ public EventSourceManager(Controller<R> controller) {
5252
registerEventSource(controllerEventSource);
5353
}
5454

55+
/**
56+
* Starts the event sources first and then the processor. Note that it's not desired to start
57+
* processing events while the event sources are not "synced". This not fully started and the
58+
* caches propagated - although for non k8s related event sources this behavior might be different
59+
* (see
60+
* {@link io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource}).
61+
*
62+
* Now the event sources are also started sequentially, mainly because others might depend on
63+
* {@link ControllerResourceEventSource} , which is started first.
64+
*/
5565
@Override
56-
public void start() throws OperatorException {
57-
eventProcessor.start();
66+
public void start() {
5867
lock.lock();
5968
try {
6069
log.debug("Starting event sources.");
@@ -65,6 +74,7 @@ public void start() throws OperatorException {
6574
log.warn("Error starting {} -> {}", eventSource, e);
6675
}
6776
}
77+
eventProcessor.start();
6878
} finally {
6979
lock.unlock();
7080
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,16 @@ private static <T extends HasMetadata> PrimaryResourcesRetriever<T> fromMetadata
3737
return Collections.emptySet();
3838
} else {
3939
final var map = isLabel ? metadata.getLabels() : metadata.getAnnotations();
40+
if (map == null) {
41+
return Collections.emptySet();
42+
}
43+
var name = map.get(nameKey);
44+
if (name == null) {
45+
return Collections.emptySet();
46+
}
4047
var namespace =
4148
namespaceKey == null ? resource.getMetadata().getNamespace() : map.get(namespaceKey);
42-
return map != null ? Set.of(new ResourceID(map.get(nameKey), namespace))
43-
: Collections.emptySet();
49+
return Set.of(new ResourceID(name, namespace));
4450
}
4551
};
4652
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,18 @@ public void run() {
119119
}
120120
}
121121

122+
/**
123+
* When this event source is queried for the resource, it might not be fully "synced". Thus, the
124+
* cache might not be propagated, therefore the supplier is checked for the resource too.
125+
*
126+
* @param primary resource of the controller
127+
* @return the related resource for this event source
128+
*/
129+
@Override
130+
public Optional<T> getAssociated(R primary) {
131+
return getValueFromCacheOrSupplier(ResourceID.fromResource(primary));
132+
}
133+
122134
/**
123135
*
124136
* @param resourceID of the target related resource

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ public void stop() throws OperatorException {
5353
timer.cancel();
5454
}
5555

56+
/**
57+
* See {@link PerResourcePollingEventSource} for more info.
58+
*
59+
* @param primary custom resource
60+
* @return related resource
61+
*/
62+
@Override
63+
public Optional<T> getAssociated(P primary) {
64+
return getValueFromCacheOrSupplier(ResourceID.fromResource(primary));
65+
}
66+
5667
public Optional<T> getValueFromCacheOrSupplier(ResourceID resourceID) {
5768
var resource = getCachedValue(resourceID);
5869
if (resource.isPresent()) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventMarkerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class EventMarkerTest {
99

1010
private final EventMarker eventMarker = new EventMarker();
1111
private ResourceID sampleResourceID = new ResourceID("test-name");
12+
private ResourceID sampleResourceID2 = new ResourceID("test-name2");
1213

1314
@Test
1415
public void returnsNoEventPresentIfNotMarkedYet() {
@@ -62,4 +63,16 @@ public void cannotMarkEventAfterDeleteEventReceived() {
6263
});
6364
}
6465

66+
@Test
67+
public void listsResourceIDSWithEventsPresent() {
68+
eventMarker.markEventReceived(sampleResourceID);
69+
eventMarker.markEventReceived(sampleResourceID2);
70+
eventMarker.unMarkEventReceived(sampleResourceID);
71+
72+
var res = eventMarker.resourceIDsWithEventPresent();
73+
74+
assertThat(res).hasSize(1);
75+
assertThat(res).contains(sampleResourceID2);
76+
}
77+
6578
}

0 commit comments

Comments
 (0)