Skip to content

Commit f6e30d4

Browse files
authored
feat: operator can be restarted (operator-framework#1675)
1 parent 2fee15d commit f6e30d4

File tree

10 files changed

+185
-36
lines changed

10 files changed

+185
-36
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

+3
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,14 @@ public void stop() throws OperatorException {
132132
log.info(
133133
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
134134
controllerManager.stop();
135+
135136
ExecutorServiceManager.stop();
136137
leaderElectionManager.stop();
137138
if (configurationService.closeClientOnStop()) {
138139
kubernetesClient.close();
139140
}
141+
142+
started = false;
140143
}
141144

142145
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@
55
import java.util.Map;
66
import java.util.Optional;
77
import java.util.concurrent.ExecutorService;
8-
import java.util.concurrent.ScheduledThreadPoolExecutor;
98

109
import org.slf4j.Logger;
1110
import org.slf4j.LoggerFactory;
1211

1312
import io.fabric8.kubernetes.api.model.HasMetadata;
1413
import io.javaoperatorsdk.operator.OperatorException;
15-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1614
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1715
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1816
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
@@ -40,20 +38,18 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
4038
private final ControllerConfiguration<?> controllerConfiguration;
4139
private final ReconciliationDispatcher<P> reconciliationDispatcher;
4240
private final Retry retry;
43-
private final ExecutorService executor;
4441
private final Metrics metrics;
4542
private final Cache<P> cache;
4643
private final EventSourceManager<P> eventSourceManager;
4744
private final RateLimiter<? extends RateLimitState> rateLimiter;
4845
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
4946
private final Map<String, Object> metricsMetadata;
50-
47+
private ExecutorService executor;
5148

5249
public EventProcessor(EventSourceManager<P> eventSourceManager) {
5350
this(
5451
eventSourceManager.getController().getConfiguration(),
5552
eventSourceManager.getControllerResourceEventSource(),
56-
ExecutorServiceManager.instance().executorService(),
5753
new ReconciliationDispatcher<>(eventSourceManager.getController()),
5854
ConfigurationServiceProvider.instance().getMetrics(),
5955
eventSourceManager);
@@ -68,7 +64,6 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
6864
this(
6965
controllerConfiguration,
7066
eventSourceManager.getControllerResourceEventSource(),
71-
null,
7267
reconciliationDispatcher,
7368
metrics,
7469
eventSourceManager);
@@ -78,17 +73,11 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
7873
private EventProcessor(
7974
ControllerConfiguration controllerConfiguration,
8075
Cache<P> cache,
81-
ExecutorService executor,
8276
ReconciliationDispatcher<P> reconciliationDispatcher,
8377
Metrics metrics,
8478
EventSourceManager<P> eventSourceManager) {
8579
this.controllerConfiguration = controllerConfiguration;
8680
this.running = false;
87-
this.executor =
88-
executor == null
89-
? new ScheduledThreadPoolExecutor(
90-
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
91-
: executor;
9281
this.reconciliationDispatcher = reconciliationDispatcher;
9382
this.retry = controllerConfiguration.getRetry();
9483
this.cache = cache;
@@ -376,6 +365,8 @@ public synchronized void stop() {
376365

377366
@Override
378367
public void start() throws OperatorException {
368+
// on restart new executor service is created and needs to be set here
369+
executor = ExecutorServiceManager.instance().executorService();
379370
this.running = true;
380371
handleAlreadyMarkedEvents();
381372
}
@@ -424,7 +415,8 @@ public void run() {
424415

425416
@Override
426417
public String toString() {
427-
return controllerName() + " -> " + executionScope;
418+
return controllerName() + " -> "
419+
+ (executionScope.getResource() != null ? executionScope : resourceID);
428420
}
429421
}
430422

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ public synchronized void stop() {
9797
eventSources.additionalNamedEventSources(),
9898
this::stopEventSource,
9999
getThreadNamer("stop"));
100-
eventSources.clear();
101100
}
102101

103102
@SuppressWarnings("rawtypes")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,31 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4141

4242
private final Map<String, InformerWrapper<T>> sources = new ConcurrentHashMap<>();
4343
private Cloner cloner;
44-
private C configuration;
45-
private MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
46-
private ResourceEventHandler<T> eventHandler;
44+
private final C configuration;
45+
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
46+
private final ResourceEventHandler<T> eventHandler;
4747
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();
4848

49+
public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
50+
C configuration,
51+
ResourceEventHandler<T> eventHandler) {
52+
this.client = client;
53+
this.configuration = configuration;
54+
this.eventHandler = eventHandler;
55+
}
56+
4957
@Override
5058
public void start() throws OperatorException {
59+
initSources();
5160
// make sure informers are all started before proceeding further
5261
sources.values().parallelStream().forEach(InformerWrapper::start);
5362
}
5463

55-
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
56-
C configuration, ResourceEventHandler<T> eventHandler) {
64+
private void initSources() {
65+
if (!sources.isEmpty()) {
66+
throw new IllegalStateException("Some sources already initialized.");
67+
}
5768
cloner = ConfigurationServiceProvider.instance().getResourceCloner();
58-
this.configuration = configuration;
59-
this.client = client;
60-
this.eventHandler = eventHandler;
61-
6269
final var targetNamespaces = configuration.getEffectiveNamespaces();
6370
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
6471
var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES);
@@ -86,7 +93,6 @@ public void changeNamespaces(Set<String> namespaces) {
8693
namespaces.forEach(ns -> {
8794
if (!sources.containsKey(ns)) {
8895
final InformerWrapper<T> source = createEventSourceForNamespace(ns);
89-
source.addIndexers(this.indexers);
9096
source.start();
9197
log.debug("Registered new {} -> {} for namespace: {}", this, source,
9298
ns);
@@ -106,6 +112,7 @@ private InformerWrapper<T> createEventSourceForNamespace(String namespace) {
106112
client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()),
107113
eventHandler, namespace);
108114
}
115+
source.addIndexers(indexers);
109116
return source;
110117
}
111118

@@ -130,6 +137,7 @@ public void stop() {
130137
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
131138
}
132139
});
140+
sources.clear();
133141
}
134142

135143
@Override
@@ -177,7 +185,6 @@ private Optional<InformerWrapper<T>> getSource(String namespace) {
177185
@Override
178186
public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
179187
this.indexers.putAll(indexers);
180-
sources.values().forEach(s -> s.addIndexers(indexers));
181188
}
182189

183190
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
3737
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
3838

3939
protected TemporaryResourceCache<R> temporaryResourceCache;
40-
protected InformerManager<R, C> cache = new InformerManager<>();
40+
protected InformerManager<R, C> cache;
4141
protected C configuration;
4242

4343
protected ManagedInformerEventSource(
4444
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
4545
super(configuration.getResourceClass());
4646
temporaryResourceCache = new TemporaryResourceCache<>(this);
47-
manager().initSources(client, configuration, this);
47+
cache = new InformerManager<>(client, configuration, this);
4848
this.configuration = configuration;
4949
}
5050

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.Timer;
55
import java.util.TimerTask;
66
import java.util.concurrent.ConcurrentHashMap;
7-
import java.util.concurrent.atomic.AtomicBoolean;
87

98
import org.slf4j.Logger;
109
import org.slf4j.LoggerFactory;
@@ -20,16 +19,16 @@ public class TimerEventSource<R extends HasMetadata>
2019
implements ResourceEventAware<R> {
2120
private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class);
2221

23-
private final Timer timer = new Timer(true);
24-
private final AtomicBoolean running = new AtomicBoolean();
22+
private Timer timer;
2523
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();
2624

25+
@SuppressWarnings("unused")
2726
public void scheduleOnce(R resource, long delay) {
2827
scheduleOnce(ResourceID.fromResource(resource), delay);
2928
}
3029

3130
public void scheduleOnce(ResourceID resourceID, long delay) {
32-
if (!running.get()) {
31+
if (!isRunning()) {
3332
throw new IllegalStateException("The TimerEventSource is not running");
3433
}
3534

@@ -55,14 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) {
5554

5655
@Override
5756
public void start() {
58-
running.set(true);
57+
if (!isRunning()) {
58+
super.start();
59+
timer = new Timer(true);
60+
}
5961
}
6062

6163
@Override
6264
public void stop() {
63-
running.set(false);
64-
onceTasks.keySet().forEach(this::cancelOnceSchedule);
65-
timer.cancel();
65+
if (isRunning()) {
66+
onceTasks.keySet().forEach(this::cancelOnceSchedule);
67+
timer.cancel();
68+
super.stop();
69+
}
6670
}
6771

6872
public class EventProducerTimeTask extends TimerTask {
@@ -75,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) {
7579

7680
@Override
7781
public void run() {
78-
if (running.get()) {
82+
if (isRunning()) {
7983
log.debug("Producing event for custom resource id: {}", customResourceUid);
8084
getEventHandler().handleEvent(new Event(customResourceUid));
8185
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import org.junit.jupiter.api.AfterEach;
4+
import org.junit.jupiter.api.BeforeAll;
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Order;
7+
import org.junit.jupiter.api.Test;
8+
9+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
10+
import io.fabric8.kubernetes.client.KubernetesClient;
11+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
12+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
13+
import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource;
14+
import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.awaitility.Awaitility.await;
18+
19+
class OperatorRestartIT {
20+
private final static KubernetesClient client = new KubernetesClientBuilder().build();
21+
private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false));
22+
private final static RestartTestReconciler reconciler = new RestartTestReconciler();
23+
private static int reconcileNumberBeforeStop = 0;
24+
25+
@BeforeAll
26+
static void registerReconciler() {
27+
LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client);
28+
operator.register(reconciler);
29+
}
30+
31+
@BeforeEach
32+
void startOperator() {
33+
operator.start();
34+
}
35+
36+
@AfterEach
37+
void stopOperator() {
38+
operator.stop();
39+
}
40+
41+
@Test
42+
@Order(1)
43+
void createResource() {
44+
client.resource(testCustomResource()).createOrReplace();
45+
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0));
46+
reconcileNumberBeforeStop = reconciler.getNumberOfExecutions();
47+
}
48+
49+
@Test
50+
@Order(2)
51+
void reconcile() {
52+
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions())
53+
.isGreaterThan(reconcileNumberBeforeStop));
54+
}
55+
56+
RestartTestCustomResource testCustomResource() {
57+
RestartTestCustomResource cr = new RestartTestCustomResource();
58+
cr.setMetadata(new ObjectMetaBuilder()
59+
.withName("test1")
60+
.build());
61+
return cr;
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.javaoperatorsdk.operator.sample.restart;
2+
3+
import java.util.Map;
4+
5+
import io.fabric8.kubernetes.api.model.ConfigMap;
6+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
7+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
8+
import io.javaoperatorsdk.operator.api.reconciler.Context;
9+
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource;
10+
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
11+
12+
@KubernetesDependent(labelSelector = "app=restart-test")
13+
public class ConfigMapDependentResource
14+
extends CRUDKubernetesDependentResource<ConfigMap, RestartTestCustomResource> {
15+
16+
public static final String DATA_KEY = "key";
17+
18+
public ConfigMapDependentResource() {
19+
super(ConfigMap.class);
20+
}
21+
22+
@Override
23+
protected ConfigMap desired(RestartTestCustomResource primary,
24+
Context<RestartTestCustomResource> context) {
25+
return new ConfigMapBuilder()
26+
.withMetadata(new ObjectMetaBuilder()
27+
.withLabels(Map.of("app", "restart-test"))
28+
.withName(primary.getMetadata().getName())
29+
.withNamespace(primary.getMetadata().getNamespace())
30+
.build())
31+
.withData(Map.of(DATA_KEY, primary.getMetadata().getName()))
32+
.build();
33+
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.restart;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("rt")
12+
public class RestartTestCustomResource
13+
extends CustomResource<Void, Void>
14+
implements Namespaced {
15+
}

0 commit comments

Comments
 (0)