5
5
import java .util .List ;
6
6
import java .util .Map ;
7
7
import java .util .Optional ;
8
+ import java .util .concurrent .ConcurrentHashMap ;
9
+ import java .util .concurrent .atomic .AtomicInteger ;
8
10
9
11
import io .fabric8 .kubernetes .api .model .HasMetadata ;
10
12
import io .javaoperatorsdk .operator .OperatorException ;
11
13
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
12
14
import io .javaoperatorsdk .operator .api .reconciler .Constants ;
13
15
import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
16
+ import io .javaoperatorsdk .operator .processing .Controller ;
14
17
import io .javaoperatorsdk .operator .processing .GroupVersionKind ;
15
18
import io .javaoperatorsdk .operator .processing .event .Event ;
16
19
import io .javaoperatorsdk .operator .processing .event .ResourceID ;
17
20
import io .micrometer .core .instrument .MeterRegistry ;
21
+ import io .micrometer .core .instrument .Tag ;
18
22
import io .micrometer .core .instrument .Timer ;
19
23
24
+ import static io .javaoperatorsdk .operator .api .reconciler .Constants .CONTROLLER_NAME ;
25
+
20
26
public class MicrometerMetrics implements Metrics {
21
27
22
28
private static final String PREFIX = "operator.sdk." ;
23
29
private static final String RECONCILIATIONS = "reconciliations." ;
30
+ private static final String RECONCILIATIONS_EXECUTIONS = PREFIX + RECONCILIATIONS + "executions." ;
31
+ private static final String RECONCILIATIONS_QUEUE_SIZE = PREFIX + RECONCILIATIONS + "queue.size." ;
24
32
private final MeterRegistry registry ;
33
+ private final Map <String , AtomicInteger > gauges = new ConcurrentHashMap <>();
25
34
26
35
public MicrometerMetrics (MeterRegistry registry ) {
27
36
this .registry = registry ;
28
37
}
29
38
39
+ @ Override
40
+ public void controllerRegistered (Controller <?> controller ) {
41
+ String executingThreadsName =
42
+ RECONCILIATIONS_EXECUTIONS + controller .getConfiguration ().getName ();
43
+ AtomicInteger executingThreads =
44
+ registry .gauge (executingThreadsName ,
45
+ gvkTags (controller .getConfiguration ().getResourceClass ()),
46
+ new AtomicInteger (0 ));
47
+ gauges .put (executingThreadsName , executingThreads );
48
+
49
+ String controllerQueueName =
50
+ RECONCILIATIONS_QUEUE_SIZE + controller .getConfiguration ().getName ();
51
+ AtomicInteger controllerQueueSize =
52
+ registry .gauge (controllerQueueName ,
53
+ gvkTags (controller .getConfiguration ().getResourceClass ()),
54
+ new AtomicInteger (0 ));
55
+ gauges .put (controllerQueueName , controllerQueueSize );
56
+ }
57
+
30
58
public <T > T timeControllerExecution (ControllerExecution <T > execution ) {
31
59
final var name = execution .controllerName ();
32
60
final var execName = PREFIX + "controllers.execution." + execution .name ();
@@ -94,13 +122,35 @@ public void reconcileCustomResource(HasMetadata resource, RetryInfo retryInfoNul
94
122
"" + retryInfo .map (RetryInfo ::getAttemptCount ).orElse (0 ),
95
123
RECONCILIATIONS + "retries.last" ,
96
124
"" + retryInfo .map (RetryInfo ::isLastAttempt ).orElse (true ));
125
+
126
+ AtomicInteger controllerQueueSize =
127
+ gauges .get (RECONCILIATIONS_QUEUE_SIZE + metadata .get (CONTROLLER_NAME ));
128
+ controllerQueueSize .incrementAndGet ();
97
129
}
98
130
99
131
@ Override
100
132
public void finishedReconciliation (HasMetadata resource , Map <String , Object > metadata ) {
101
133
incrementCounter (ResourceID .fromResource (resource ), RECONCILIATIONS + "success" , metadata );
102
134
}
103
135
136
+ @ Override
137
+ public void reconciliationExecutionStarted (HasMetadata resource , Map <String , Object > metadata ) {
138
+ AtomicInteger reconcilerExecutions =
139
+ gauges .get (RECONCILIATIONS_EXECUTIONS + metadata .get (CONTROLLER_NAME ));
140
+ reconcilerExecutions .incrementAndGet ();
141
+ }
142
+
143
+ @ Override
144
+ public void reconciliationExecutionFinished (HasMetadata resource , Map <String , Object > metadata ) {
145
+ AtomicInteger reconcilerExecutions =
146
+ gauges .get (RECONCILIATIONS_EXECUTIONS + metadata .get (CONTROLLER_NAME ));
147
+ reconcilerExecutions .decrementAndGet ();
148
+
149
+ AtomicInteger controllerQueueSize =
150
+ gauges .get (RECONCILIATIONS_QUEUE_SIZE + metadata .get (CONTROLLER_NAME ));
151
+ controllerQueueSize .decrementAndGet ();
152
+ }
153
+
104
154
public void failedReconciliation (HasMetadata resource , Exception exception ,
105
155
Map <String , Object > metadata ) {
106
156
var cause = exception .getCause ();
@@ -118,6 +168,12 @@ public void failedReconciliation(HasMetadata resource, Exception exception,
118
168
return registry .gaugeMapSize (PREFIX + name + ".size" , Collections .emptyList (), map );
119
169
}
120
170
171
+ public static List <Tag > gvkTags (Class <? extends HasMetadata > resourceClass ) {
172
+ final var gvk = GroupVersionKind .gvkFor (resourceClass );
173
+ return List .of (Tag .of ("group" , gvk .group ), Tag .of ("version" , gvk .version ),
174
+ Tag .of ("kind" , gvk .kind ));
175
+ }
176
+
121
177
private void incrementCounter (ResourceID id , String counterName , Map <String , Object > metadata ,
122
178
String ... additionalTags ) {
123
179
final var additionalTagsNb =
0 commit comments