-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathMongoCollection+ChangeStreams.swift
117 lines (114 loc) · 5.46 KB
/
MongoCollection+ChangeStreams.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import MongoSwift
extension MongoCollection {
/**
* Starts a `ChangeStream` on a collection. The `CollectionType` will be associated with the `fullDocument`
* field in `ChangeStreamEvent`s emitted by the returned `ChangeStream`. The server will return an error if
* this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
*
* - Returns: A `ChangeStream` on a specific collection.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil
) throws -> ChangeStream<ChangeStreamEvent<CollectionType>> {
try self.watch(
pipeline,
options: options,
session: session,
withEventType: ChangeStreamEvent<CollectionType>.self
)
}
/**
* Starts a `ChangeStream` on a collection. Associates the specified `Codable` type `T` with the `fullDocument`
* field in the `ChangeStreamEvent`s emitted by the returned `ChangeStream`. The server will return an error
* if this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withFullDocumentType: The type that the `fullDocument` field of the emitted `ChangeStreamEvent`s will be
* decoded to.
*
* - Returns: A `ChangeStream` on a specific collection.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch<FullDocType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withFullDocumentType _: FullDocType.Type
) throws -> ChangeStream<ChangeStreamEvent<FullDocType>> {
try self.watch(
pipeline,
options: options,
session: session,
withEventType: ChangeStreamEvent<FullDocType>.self
)
}
/**
* Starts a `ChangeStream` on a collection. Associates the specified `Codable` type `T` with the returned
* `ChangeStream`. The server will return an error if this method is called on a system collection.
*
* - Parameters:
* - pipeline: An array of aggregation pipeline stages to apply to the events returned by the change stream.
* - options: An optional `ChangeStreamOptions` to use when constructing the change stream.
* - session: An optional `ClientSession` to use with this change stream.
* - withEventType: The type that the entire change stream response will be decoded to and that will be returned
* when iterating through the change stream.
*
* - Returns: A `ChangeStream` on a specific collection.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs on the server while creating the change stream.
* - `MongoError.InvalidArgumentError` if the options passed formed an invalid combination.
* - `MongoError.InvalidArgumentError` if the `_id` field is projected out of the change stream documents by the
* pipeline.
*
* - SeeAlso:
* - https://docs.mongodb.com/manual/changeStreams/
* - https://docs.mongodb.com/manual/meta/aggregation-quick-reference/
* - https://docs.mongodb.com/manual/reference/system-collections/
*/
public func watch<EventType: Codable>(
_ pipeline: [BSONDocument] = [],
options: ChangeStreamOptions? = nil,
session: ClientSession? = nil,
withEventType _: EventType.Type
) throws -> ChangeStream<EventType> {
let asyncStream = try self.asyncColl.watch(
pipeline,
options: options,
session: session?.asyncSession,
withEventType: EventType.self
).wait()
return ChangeStream(wrapping: asyncStream, client: self.client)
}
}