-
Notifications
You must be signed in to change notification settings - Fork 531
/
Copy pathmetrics.go
285 lines (250 loc) · 6.92 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package expvar
import (
"context"
"errors"
"sync"
"time"
"go.uber.org/zap"
)
type Metric int
const (
Bytes Metric = iota
MemAllocs
MemBytes
ActiveEvents
TotalEvents
IntakeEventsAccepted
IntakeEventsErrorsInvalid
IntakeEventsErrorsTooLarge
TransactionsProcessed
SpansProcessed
MetricsProcessed
ErrorsProcessed
NumGC
RSSMemoryBytes
Goroutines
HeapAlloc
HeapObjects
AvailableBulkRequests
ErrorElasticResponses
ErrorOTLPTracesResponses
ErrorOTLPMetricsResponses
TBSLsmSize
TBSVlogSize
)
type AggregateStats struct {
First int64
Last int64
Min int64
Max int64
Mean float64
samples int64
}
type watchItem struct {
notifyChan chan<- bool
validator func(int64) bool
}
// Collector defines a metric collector which queries expvar
// endpoint periodically and aggregates the collected metrics.
//
// Watch is a one-time hook into collector to know about when
// a specific state of a metric is observed by the collector.
type Collector struct {
l sync.Mutex
metrics map[Metric]AggregateStats
watches map[Metric]watchItem
stopped bool
logger *zap.Logger
}
// StartNewCollector creates a new collector and starts
// querying expvar endpoint at the specified interval.
func StartNewCollector(
ctx context.Context,
serverURL string,
period time.Duration,
logger *zap.Logger,
) (*Collector, error) {
c := &Collector{
metrics: make(map[Metric]AggregateStats),
watches: make(map[Metric]watchItem),
stopped: false,
logger: logger,
}
return c, c.start(ctx, serverURL, period)
}
// Get returns the aggregated stat of a given metric.
func (c *Collector) Get(m Metric) AggregateStats {
c.l.Lock()
defer c.l.Unlock()
return c.metrics[m]
}
// Delta returns the diff of a metric value from the first
// event that was collected by the collector.
func (c *Collector) Delta(m Metric) int64 {
c.l.Lock()
defer c.l.Unlock()
stats := c.metrics[m]
return stats.Last - stats.First
}
// WatchMetric configures a new watch for a given metric. The
// watch is deleted after the specified expected value is
// observed or collector is stopped.
//
// WatchMetric returns a read only channel to observe the state
// of the watch. A true event on this channel refers to an
// observation with the expected value while the channel is
// closed to denote error in the collector. Both cases end the
// lifecycle of the watch.
func (c *Collector) WatchMetric(m Metric, expected int64) (<-chan bool, error) {
c.l.Lock()
defer c.l.Unlock()
if c.stopped {
return nil, errors.New("collector has stopped")
}
if _, ok := c.watches[m]; ok {
return nil, errors.New("watch already exists for the given metric")
}
out := make(chan bool, 1)
c.watches[m] = watchItem{
notifyChan: out,
validator: func(in int64) bool { return in == expected },
}
return out, nil
}
func (c *Collector) accumulate(e expvar) {
c.l.Lock()
defer c.l.Unlock()
c.processMetric(Goroutines, e.Goroutines)
c.processMetric(Bytes, e.UncompressedBytes)
c.processMetric(TotalEvents, e.TotalEvents)
c.processMetric(ActiveEvents, e.ActiveEvents)
c.processMetric(RSSMemoryBytes, e.RSSMemoryBytes)
c.processMetric(AvailableBulkRequests, e.AvailableBulkRequests)
c.processMetric(IntakeEventsAccepted, e.IntakeEventsAccepted)
c.processMetric(IntakeEventsErrorsInvalid, e.IntakeEventsErrorsInvalid)
c.processMetric(IntakeEventsErrorsTooLarge, e.IntakeEventsErrorsTooLarge)
c.processMetric(TransactionsProcessed, e.TransactionsProcessed)
c.processMetric(SpansProcessed, e.SpansProcessed)
c.processMetric(MetricsProcessed, e.MetricsProcessed)
c.processMetric(ErrorsProcessed, e.ErrorsProcessed)
c.processMetric(ErrorElasticResponses, e.ErrorElasticResponses)
c.processMetric(ErrorOTLPTracesResponses, e.ErrorOTLPTracesResponses)
c.processMetric(ErrorOTLPMetricsResponses, e.ErrorOTLPMetricsResponses)
c.processMetric(NumGC, int64(e.NumGC))
c.processMetric(MemAllocs, int64(e.Mallocs))
c.processMetric(MemBytes, int64(e.TotalAlloc))
c.processMetric(HeapAlloc, int64(e.HeapAlloc))
c.processMetric(HeapObjects, int64(e.HeapObjects))
c.processMetric(TBSLsmSize, e.TBSLsmSize)
c.processMetric(TBSVlogSize, e.TBSVlogSize)
}
func (c *Collector) processMetric(m Metric, val int64) {
stats := c.metrics[m]
c.metrics[m] = c.updateMetric(stats, val)
if watch, ok := c.watches[m]; ok && watch.validator(val) {
watch.notifyChan <- true
close(watch.notifyChan)
delete(c.watches, m)
}
}
func (c *Collector) updateMetric(stats AggregateStats, value int64) AggregateStats {
// Initialize for first value
if stats.samples == 0 {
stats.First, stats.Min, stats.Max = value, value, value
}
stats.samples++
stats.Last = value
stats.Mean += (float64(value) - stats.Mean) / float64(stats.samples)
if stats.Min > value {
stats.Min = value
}
if stats.Max < value {
stats.Max = value
}
return stats
}
func (c *Collector) cleanup() {
c.l.Lock()
defer c.l.Unlock()
c.stopped = true
for m, watch := range c.watches {
close(watch.notifyChan)
delete(c.watches, m)
}
}
func (c *Collector) start(ctx context.Context, serverURL string, period time.Duration) error {
var first expvar
err := queryExpvar(ctx, &first, serverURL)
if err != nil {
c.cleanup()
return err
}
c.accumulate(first)
outChan, errChan := run(ctx, serverURL, period)
go func() {
defer c.cleanup()
for {
select {
case <-ctx.Done():
return
case err := <-errChan:
c.logger.Warn("failed while querying expvar", zap.Error(err))
return
case m := <-outChan:
c.accumulate(m)
}
}
}()
return nil
}
func run(ctx context.Context, serverURL string, period time.Duration) (<-chan expvar, <-chan error) {
outChan := make(chan expvar)
errChan := make(chan error)
go func() {
ticker := time.NewTicker(period)
defer func() {
ticker.Stop()
close(outChan)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var e expvar
ctxWithTimeout, cancel := context.WithTimeout(ctx, period)
err := queryExpvar(ctxWithTimeout, &e, serverURL)
cancel()
if err != nil {
select {
case errChan <- err:
case <-ctx.Done():
}
return
}
select {
case outChan <- e:
case <-ctx.Done():
return
}
}
}
}()
return outChan, errChan
}