-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathMongoCollection+ChangeStreams.swift
132 lines (129 loc) · 6.57 KB
/
MongoCollection+ChangeStreams.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import CLibMongoC
import NIO
extension MongoCollection {
/**
* Starts a `ChangeStream` on this collection. The `CollectionType` will be associated with the `fullDocument`
* field in `ChangeStreamEvent`s emitted by the returned `ChangeStream`. The server will return an error if
* this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
*
* - Warning:
* If the returned change stream is alive when it goes out of scope, it will leak resources. To ensure the
* change stream is dead before it leaves scope, invoke `ChangeStream.kill(...)` on it.
*
* - Returns:
* An `EventLoopFuture<ChangeStream>`. On success, contains a `ChangeStream` watching this collection.
*
* If the future fails, the error is likely one of the following:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil
) -> EventLoopFuture<ChangeStream<ChangeStreamEvent<CollectionType>>> {
self.watch(pipeline, options: options, session: session, withFullDocumentType: CollectionType.self)
}
/**
* Starts a `ChangeStream` on this collection. Associates the specified `Codable` type `T` with the `fullDocument`
* field in the `ChangeStreamEvent`s emitted by the returned `ChangeStream`. The server will return an error
* if this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withFullDocumentType: The type that the `fullDocument` field of the emitted `ChangeStreamEvent`s will be
* decoded to.
*
* - Warning:
* If the returned change stream is alive when it goes out of scope, it will leak resources. To ensure the
* change stream is dead before it leaves scope, invoke `ChangeStream.kill(...)` on it.
*
* - Returns:
* An `EventLoopFuture<ChangeStream>`. On success, contains a `ChangeStream` watching this collection.
*
* If the future fails, the error is likely one of the following:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch<FullDocType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withFullDocumentType _: FullDocType.Type
) -> EventLoopFuture<ChangeStream<ChangeStreamEvent<FullDocType>>> {
self.watch(
pipeline,
options: options,
session: session,
withEventType: ChangeStreamEvent<FullDocType>.self
)
}
/**
* Starts a `ChangeStream` on this collection. Associates the specified `Codable` type `T` with the returned
* `ChangeStream`. The server will return an error if this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withEventType: The type that the entire change stream response will be decoded to and that will be returned
* when iterating through the change stream.
*
* - Warning:
* If the returned change stream is alive when it goes out of scope, it will leak resources. To ensure the
* change stream is dead before it leaves scope, invoke `ChangeStream.kill(...)` on it.
*
* - Returns:
* An `EventLoopFuture<ChangeStream>`. On success, contains a `ChangeStream` watching this collection.
*
* If the future fails, the error is likely one of the following:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch<EventType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withEventType _: EventType.Type
) -> EventLoopFuture<ChangeStream<EventType>> {
let operation = WatchOperation<CollectionType, EventType>(
target: .collection(self),
pipeline: pipeline,
options: options
)
return self._client.operationExecutor.execute(
operation,
client: self._client,
on: self.eventLoop,
session: session
)
}
}