-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathAPM.swift
794 lines (623 loc) · 29.2 KB
/
APM.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
import CLibMongoC
import Foundation
/// A protocol for `CommandEvent` handlers to implement.
public protocol CommandEventHandler: AnyObject {
/// Handle a `CommandEvent`.
func handleCommandEvent(_ event: CommandEvent)
}
/// A protocol for handlers of events relating to SDAM to implement.
public protocol SDAMEventHandler: AnyObject {
/// Handle an `SDAMEvent`.
func handleSDAMEvent(_ event: SDAMEvent)
}
/// A protocol for events that are directly consumable by users to implement.
private protocol Publishable {
func publish(to client: MongoClient)
}
/// A protocol for monitoring events to implement, indicating that they can be initialized from a libmongoc event
/// and that they can be packaged into a type which can be published.
private protocol MongoSwiftEvent {
associatedtype MongocEventType: MongocEvent
associatedtype PublishableEventType: Publishable
static var monitoringComponent: MonitoringComponent { get }
init(mongocEvent: MongocEventType)
func toPublishable() -> PublishableEventType
}
/// Indicates which type of monitoring an event is associated with.
private enum MonitoringComponent {
case command, sdam
}
/// A protocol for libmongoc event wrappers to implement.
private protocol MongocEvent {
init(_ eventPtr: OpaquePointer)
var context: UnsafeMutableRawPointer? { get }
}
/// A command monitoring event.
public enum CommandEvent: Publishable {
/// An event published when a command starts.
case started(CommandStartedEvent)
/// An event published when a command succeeds.
case succeeded(CommandSucceededEvent)
/// An event published when a command fails.
case failed(CommandFailedEvent)
private var event: CommandEventProtocol {
switch self {
case let .started(event):
return event
case let .succeeded(event):
return event
case let .failed(event):
return event
}
}
fileprivate func publish(to client: MongoClient) {
client.commandEventHandlers.forEach { handler in
handler.handleCommandEvent(self)
}
}
/// The name of the command that generated this event.
public var commandName: String {
self.event.commandName
}
/// The driver generated request ID.
public var requestID: Int64 {
self.event.requestID
}
/// The driver generated operation ID. This is used to link events together such
/// as bulk write operations.
public var operationID: Int64 {
self.event.operationID
}
/// The address of the server the command was run against.
public var serverAddress: ServerAddress {
self.event.serverAddress
}
/// The service ID for the command, if the driver is in load balancer mode.
public var serviceID: BSONObjectID? {
self.event.serviceID
}
}
/// A protocol for command monitoring events to implement, specifying the command name and other shared fields.
private protocol CommandEventProtocol {
/// The command name.
var commandName: String { get }
/// The driver generated request ID.
var requestID: Int64 { get }
/// The driver generated operation ID. This is used to link events together such
/// as bulk write operations.
var operationID: Int64 { get }
/// The address of the server the command was run against.
var serverAddress: ServerAddress { get }
/// The service ID for the command, if the driver is in load balancer mode.
var serviceID: BSONObjectID? { get }
}
/// An event published when a command starts.
public struct CommandStartedEvent: MongoSwiftEvent, CommandEventProtocol {
/// Wrapper around a `mongoc_apm_command_started_t`.
fileprivate struct MongocCommandStartedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_command_started_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .command }
/// The command.
public let command: BSONDocument
/// The database name.
public let databaseName: String
/// The command name.
public let commandName: String
/// The driver generated request id.
public let requestID: Int64
/// The driver generated operation id. This is used to link events together such
/// as bulk write operations.
public let operationID: Int64
/// The address of the server the command was run against.
public let serverAddress: ServerAddress
/// The service ID for the command, if the driver is in load balancer mode.
public let serviceID: BSONObjectID?
fileprivate init(mongocEvent: MongocCommandStartedEvent) {
// we have to copy because libmongoc owns the pointer.
self.command = BSONDocument(copying: mongoc_apm_command_started_get_command(mongocEvent.ptr))
self.databaseName = String(cString: mongoc_apm_command_started_get_database_name(mongocEvent.ptr))
self.commandName = String(cString: mongoc_apm_command_started_get_command_name(mongocEvent.ptr))
self.requestID = mongoc_apm_command_started_get_request_id(mongocEvent.ptr)
self.operationID = mongoc_apm_command_started_get_operation_id(mongocEvent.ptr)
self.serverAddress = ServerAddress(mongoc_apm_command_started_get_host(mongocEvent.ptr))
if let serviceID = mongoc_apm_command_started_get_service_id(mongocEvent.ptr) {
self.serviceID = BSONObjectID(bsonOid: serviceID.pointee)
} else {
self.serviceID = nil
}
}
fileprivate func toPublishable() -> CommandEvent {
.started(self)
}
}
/// An event published when a command succeeds.
public struct CommandSucceededEvent: MongoSwiftEvent, CommandEventProtocol {
/// Wrapper around a `mongoc_apm_command_succeeded_t`.
fileprivate struct MongocCommandSucceededEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_command_succeeded_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .command }
/// The execution time of the event, in microseconds.
public let duration: Int
/// The command reply.
public let reply: BSONDocument
/// The command name.
public let commandName: String
/// The driver generated request id.
public let requestID: Int64
/// The driver generated operation id. This is used to link events together such
/// as bulk write operations.
public let operationID: Int64
/// The address of the server the command was run against.
public let serverAddress: ServerAddress
/// The service ID for the command, if the driver is in load balancer mode.
public let serviceID: BSONObjectID?
fileprivate init(mongocEvent: MongocCommandSucceededEvent) {
// TODO: SWIFT-349 add logging to check and warn of unlikely int size issues
self.duration = Int(mongoc_apm_command_succeeded_get_duration(mongocEvent.ptr))
// we have to copy because libmongoc owns the pointer.
self.reply = BSONDocument(copying: mongoc_apm_command_succeeded_get_reply(mongocEvent.ptr))
self.commandName = String(cString: mongoc_apm_command_succeeded_get_command_name(mongocEvent.ptr))
self.requestID = mongoc_apm_command_succeeded_get_request_id(mongocEvent.ptr)
self.operationID = mongoc_apm_command_succeeded_get_operation_id(mongocEvent.ptr)
self.serverAddress = ServerAddress(mongoc_apm_command_succeeded_get_host(mongocEvent.ptr))
if let serviceID = mongoc_apm_command_succeeded_get_service_id(mongocEvent.ptr) {
self.serviceID = BSONObjectID(bsonOid: serviceID.pointee)
} else {
self.serviceID = nil
}
}
fileprivate func toPublishable() -> CommandEvent {
.succeeded(self)
}
}
/// An event published when a command fails.
public struct CommandFailedEvent: MongoSwiftEvent, CommandEventProtocol {
/// Wrapper around a `mongoc_apm_command_failed_t`.
fileprivate struct MongocCommandFailedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_command_failed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .command }
/// The execution time of the event, in microseconds.
public let duration: Int
/// The command name.
public let commandName: String
/// The failure, represented as a MongoErrorProtocol.
public let failure: MongoErrorProtocol
/// The client generated request id.
public let requestID: Int64
/// The driver generated operation id. This is used to link events together such
/// as bulk write operations.
public let operationID: Int64
/// The connection id for the command.
public let serverAddress: ServerAddress
/// The service ID for the command, if the driver is in load balancer mode.
public let serviceID: BSONObjectID?
fileprivate init(mongocEvent: MongocCommandFailedEvent) {
self.duration = Int(mongoc_apm_command_failed_get_duration(mongocEvent.ptr))
self.commandName = String(cString: mongoc_apm_command_failed_get_command_name(mongocEvent.ptr))
var error = bson_error_t()
mongoc_apm_command_failed_get_error(mongocEvent.ptr, &error)
let reply = BSONDocument(copying: mongoc_apm_command_failed_get_reply(mongocEvent.ptr))
self.failure = extractMongoError(error: error, reply: reply) // should always return a CommandError
self.requestID = mongoc_apm_command_failed_get_request_id(mongocEvent.ptr)
self.operationID = mongoc_apm_command_failed_get_operation_id(mongocEvent.ptr)
self.serverAddress = ServerAddress(mongoc_apm_command_failed_get_host(mongocEvent.ptr))
if let serviceID = mongoc_apm_command_failed_get_service_id(mongocEvent.ptr) {
self.serviceID = BSONObjectID(bsonOid: serviceID.pointee)
} else {
self.serviceID = nil
}
}
fileprivate func toPublishable() -> CommandEvent {
.failed(self)
}
}
/// An SDAM monitoring event related to topology updates.
public enum SDAMEvent: Publishable {
/// Published when a topology description changes.
case topologyDescriptionChanged(TopologyDescriptionChangedEvent)
/// Published when a topology is initialized.
case topologyOpening(TopologyOpeningEvent)
/// Published when a topology is shut down.
case topologyClosed(TopologyClosedEvent)
/// Published when a topology's information about a server changes.
case serverDescriptionChanged(ServerDescriptionChangedEvent)
/// Published when information about a new server is discovered.
case serverOpening(ServerOpeningEvent)
/// Published when a server is removed from a topology and no longer monitored.
case serverClosed(ServerClosedEvent)
/// Published when the server monitor’s "hello" command is started - immediately before
/// the "hello" command is serialized into raw BSON and written to the socket.
case serverHeartbeatStarted(ServerHeartbeatStartedEvent)
/// Published when the server monitor’s "hello" command succeeds.
case serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent)
/// Published when the server monitor’s "hello" command fails, either with an “ok: 0” or a socket exception.
case serverHeartbeatFailed(ServerHeartbeatFailedEvent)
fileprivate func publish(to client: MongoClient) {
client.sdamEventHandlers.forEach { handler in
handler.handleSDAMEvent(self)
}
}
}
/// Published when a server description changes. This does NOT include changes to the server's roundTripTime property.
public struct ServerDescriptionChangedEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_changed_t`.
fileprivate struct MongocServerChangedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_changed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The connection ID (host/port pair) of the server.
public let serverAddress: ServerAddress
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
/// The previous server description.
public let previousDescription: ServerDescription
/// The new server description.
public let newDescription: ServerDescription
fileprivate init(mongocEvent: MongocServerChangedEvent) {
self.serverAddress = ServerAddress(mongoc_apm_server_changed_get_host(mongocEvent.ptr))
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_server_changed_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
self.previousDescription =
ServerDescription(mongoc_apm_server_changed_get_previous_description(mongocEvent.ptr))
self.newDescription = ServerDescription(mongoc_apm_server_changed_get_new_description(mongocEvent.ptr))
}
fileprivate func toPublishable() -> SDAMEvent {
.serverDescriptionChanged(self)
}
}
/// Published when a server is initialized.
public struct ServerOpeningEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_opening_t`.
fileprivate struct MongocServerOpeningEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_opening_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The connection ID (host/port pair) of the server.
public let serverAddress: ServerAddress
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
fileprivate init(mongocEvent: MongocServerOpeningEvent) {
self.serverAddress = ServerAddress(mongoc_apm_server_opening_get_host(mongocEvent.ptr))
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_server_opening_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
}
fileprivate func toPublishable() -> SDAMEvent {
.serverOpening(self)
}
}
/// Published when a server is closed.
public struct ServerClosedEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_closed_t`.
fileprivate struct MongocServerClosedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_closed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The connection ID (host/port pair) of the server.
public let serverAddress: ServerAddress
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
fileprivate init(mongocEvent: MongocServerClosedEvent) {
self.serverAddress = ServerAddress(mongoc_apm_server_closed_get_host(mongocEvent.ptr))
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_server_closed_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
}
fileprivate func toPublishable() -> SDAMEvent {
.serverClosed(self)
}
}
/// Published when a topology description changes.
public struct TopologyDescriptionChangedEvent: MongoSwiftEvent {
/// Wrapper around a mongoc_apm_topology_changed_t.
fileprivate struct MongocTopologyChangedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_topology_changed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
/// The old topology description.
public let previousDescription: TopologyDescription
/// The new topology description.
public let newDescription: TopologyDescription
fileprivate init(mongocEvent: MongocTopologyChangedEvent) {
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_topology_changed_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
self.previousDescription =
TopologyDescription(mongoc_apm_topology_changed_get_previous_description(mongocEvent.ptr))
self.newDescription = TopologyDescription(mongoc_apm_topology_changed_get_new_description(mongocEvent.ptr))
}
fileprivate func toPublishable() -> SDAMEvent {
.topologyDescriptionChanged(self)
}
}
/// Published when a topology is initialized.
public struct TopologyOpeningEvent: MongoSwiftEvent {
/// Wrapper around a mongoc_apm_topology_opening_t.
fileprivate struct MongocTopologyOpeningEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_topology_opening_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
fileprivate init(mongocEvent: MongocTopologyOpeningEvent) {
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_topology_opening_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
}
fileprivate func toPublishable() -> SDAMEvent {
.topologyOpening(self)
}
}
/// Published when a topology is closed.
public struct TopologyClosedEvent: MongoSwiftEvent {
/// Wrapper around a mongoc_apm_topology_closed_t.
fileprivate struct MongocTopologyClosedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_topology_closed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// A unique identifier for the topology.
public let topologyID: BSONObjectID
fileprivate init(mongocEvent: MongocTopologyClosedEvent) {
var oid = bson_oid_t()
withUnsafeMutablePointer(to: &oid) { oidPtr in
mongoc_apm_topology_closed_get_topology_id(mongocEvent.ptr, oidPtr)
}
self.topologyID = BSONObjectID(bsonOid: oid)
}
fileprivate func toPublishable() -> SDAMEvent {
.topologyClosed(self)
}
}
/// Published when the server monitor’s "hello" command is started - immediately before
/// the "hello" command is serialized into raw BSON and written to the socket.
public struct ServerHeartbeatStartedEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_heartbeat_started_t`.
fileprivate struct MongocServerHeartbeatStartedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_heartbeat_started_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The address of the server.
public let serverAddress: ServerAddress
fileprivate init(mongocEvent: MongocServerHeartbeatStartedEvent) {
self.serverAddress = ServerAddress(mongoc_apm_server_heartbeat_started_get_host(mongocEvent.ptr))
}
fileprivate func toPublishable() -> SDAMEvent {
.serverHeartbeatStarted(self)
}
}
/// Published when the server monitor’s "hello" command succeeds.
public struct ServerHeartbeatSucceededEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_heartbeat_succeeded_t`.
fileprivate struct MongocServerHeartbeatSucceededEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_heartbeat_succeeded_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The execution time of the event, in microseconds.
public let duration: Int
/// The command reply.
public let reply: BSONDocument
/// The address of the server.
public let serverAddress: ServerAddress
fileprivate init(mongocEvent: MongocServerHeartbeatSucceededEvent) {
self.duration = Int(mongoc_apm_server_heartbeat_succeeded_get_duration(mongocEvent.ptr))
// we have to copy because libmongoc owns the pointer.
self.reply = BSONDocument(copying: mongoc_apm_server_heartbeat_succeeded_get_reply(mongocEvent.ptr))
self.serverAddress = ServerAddress(mongoc_apm_server_heartbeat_succeeded_get_host(mongocEvent.ptr))
}
fileprivate func toPublishable() -> SDAMEvent {
.serverHeartbeatSucceeded(self)
}
}
/// Published when the server monitor’s "hello" command fails, either with an “ok: 0” or a socket exception.
public struct ServerHeartbeatFailedEvent: MongoSwiftEvent {
/// Wrapper around a `mongoc_apm_server_heartbeat_failed_t`.
fileprivate struct MongocServerHeartbeatFailedEvent: MongocEvent {
fileprivate let ptr: OpaquePointer
fileprivate init(_ eventPtr: OpaquePointer) {
self.ptr = eventPtr
}
fileprivate var context: UnsafeMutableRawPointer? {
mongoc_apm_server_heartbeat_failed_get_context(self.ptr)
}
}
fileprivate static var monitoringComponent: MonitoringComponent { .sdam }
/// The execution time of the event, in microseconds.
public let duration: Int
/// The failure.
public let failure: MongoErrorProtocol
/// The address of the server.
public let serverAddress: ServerAddress
fileprivate init(mongocEvent: MongocServerHeartbeatFailedEvent) {
self.duration = Int(mongoc_apm_server_heartbeat_failed_get_duration(mongocEvent.ptr))
var error = bson_error_t()
mongoc_apm_server_heartbeat_failed_get_error(mongocEvent.ptr, &error)
self.failure = extractMongoError(error: error)
self.serverAddress = ServerAddress(mongoc_apm_server_heartbeat_failed_get_host(mongocEvent.ptr))
}
fileprivate func toPublishable() -> SDAMEvent {
.serverHeartbeatFailed(self)
}
}
/// Callbacks that will be set for events with the corresponding names if the user enables
/// monitoring for those events. These functions will parse the libmongoc events and publish the results
/// to the user-specified event handler.
/// A callback that will be set for "command started" events if the user enables command monitoring.
private func commandStarted(_ eventPtr: OpaquePointer?) {
publishEvent(type: CommandStartedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "command succeeded" events if the user enables command monitoring.
private func commandSucceeded(_ eventPtr: OpaquePointer?) {
publishEvent(type: CommandSucceededEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "command failed" events if the user enables command monitoring.
private func commandFailed(_ eventPtr: OpaquePointer?) {
publishEvent(type: CommandFailedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server description changed" events if the user enables server monitoring.
private func serverDescriptionChanged(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerDescriptionChangedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server opening" events if the user enables server monitoring.
private func serverOpening(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerOpeningEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server closed" events if the user enables server monitoring.
private func serverClosed(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerClosedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "topology description changed" events if the user enables server monitoring.
private func topologyDescriptionChanged(_ eventPtr: OpaquePointer?) {
publishEvent(type: TopologyDescriptionChangedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "topology opening" events if the user enables server monitoring.
private func topologyOpening(_ eventPtr: OpaquePointer?) {
publishEvent(type: TopologyOpeningEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "topology closed" events if the user enables server monitoring.
private func topologyClosed(_ eventPtr: OpaquePointer?) {
publishEvent(type: TopologyClosedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server heartbeat started" events if the user enables server monitoring.
private func serverHeartbeatStarted(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerHeartbeatStartedEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server heartbeat succeeded" events if the user enables server monitoring.
private func serverHeartbeatSucceeded(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerHeartbeatSucceededEvent.self, eventPtr: eventPtr)
}
/// A callback that will be set for "server heartbeat failed" events if the user enables server monitoring.
private func serverHeartbeatFailed(_ eventPtr: OpaquePointer?) {
publishEvent(type: ServerHeartbeatFailedEvent.self, eventPtr: eventPtr)
}
/// Publish an event to the client responsible for this event.
private func publishEvent<T: MongoSwiftEvent>(type: T.Type, eventPtr: OpaquePointer?) {
guard let eventPtr = eventPtr else {
fatalError("Missing event pointer for \(type)")
}
// The underlying pointer is only valid within the registered callback, so this event should not escape this scope.
let mongocEvent = type.MongocEventType(eventPtr)
guard let context = mongocEvent.context else {
fatalError("Missing context for \(type)")
}
let client = Unmanaged<MongoClient>.fromOpaque(context).takeUnretainedValue()
// only create Swift events if handlers are actually registered for this type of event.
switch type.monitoringComponent {
case .sdam:
guard !client.sdamEventHandlers.isEmpty else {
return
}
case .command:
guard !client.commandEventHandlers.isEmpty else {
return
}
}
let event = type.init(mongocEvent: mongocEvent)
event.toPublishable().publish(to: client)
}
/// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring.
extension ConnectionPool {
/// Internal function to install monitoring callbacks for this pool. **This method may only be called before any
/// connections are checked out from the pool.**
internal func initializeMonitoring(client: MongoClient) {
guard let callbacks = mongoc_apm_callbacks_new() else {
fatalError("failed to initialize new mongoc_apm_callbacks_t")
}
mongoc_apm_set_command_started_cb(callbacks, commandStarted)
mongoc_apm_set_command_succeeded_cb(callbacks, commandSucceeded)
mongoc_apm_set_command_failed_cb(callbacks, commandFailed)
mongoc_apm_set_server_changed_cb(callbacks, serverDescriptionChanged)
mongoc_apm_set_server_opening_cb(callbacks, serverOpening)
mongoc_apm_set_server_closed_cb(callbacks, serverClosed)
mongoc_apm_set_topology_changed_cb(callbacks, topologyDescriptionChanged)
mongoc_apm_set_topology_opening_cb(callbacks, topologyOpening)
mongoc_apm_set_topology_closed_cb(callbacks, topologyClosed)
mongoc_apm_set_server_heartbeat_started_cb(callbacks, serverHeartbeatStarted)
mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded)
mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed)
self.setAPMCallbacks(callbacks: callbacks, client: client)
}
}