-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathMongoCollection+BulkWrite.swift
514 lines (455 loc) · 21.2 KB
/
MongoCollection+BulkWrite.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
import CLibMongoC
import NIO
/// An extension of `MongoCollection` encapsulating bulk write operations.
extension MongoCollection {
/**
* Execute multiple write operations.
*
* - Parameters:
* - requests: a `[WriteModel]` containing the writes to perform.
* - options: optional `BulkWriteOptions` to use while executing the operation.
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns:
* An `EventLoopFuture<BulkWriteResult?>`. On success, the future contains either a `BulkWriteResult`, or
* contains `nil` if the write concern is unacknowledged.
*
* If the future fails, the error is likely one of the following:
* - `MongoError.InvalidArgumentError` if `requests` is empty.
* - `MongoError.LogicError` if the provided session is inactive.
* - `MongoError.LogicError` if this collection's parent client has already been closed.
* - `MongoError.BulkWriteError` if any error occurs while performing the writes. This includes errors that would
* typically be propagated as `RuntimeError`s or `MongoError.CommandError`s elsewhere.
* - `EncodingError` if an error occurs while encoding the `CollectionType` or the options to BSON.
*/
public func bulkWrite(
_ requests: [WriteModel<T>],
options: BulkWriteOptions? = nil,
session: ClientSession? = nil
) -> EventLoopFuture<BulkWriteResult?> {
guard !requests.isEmpty else {
return self._client.operationExecutor
.makeFailedFuture(
MongoError.InvalidArgumentError(message: "requests cannot be empty"),
on: self.eventLoop
)
}
let operation = BulkWriteOperation(collection: self, models: requests, options: options)
return self._client.operationExecutor.execute(
operation,
client: self._client,
on: self.eventLoop,
session: session
)
}
}
/// Enum encompassing operations that can be run as part of a `bulkWrite`.
public enum WriteModel<CollectionType: Codable> {
/// A `deleteOne`.
/// Parameters:
/// - A `BSONDocument` representing the match criteria.
/// - `options`: Optional `DeleteModelOptions`.
case deleteOne(BSONDocument, options: DeleteModelOptions? = nil)
/// A `deleteMany`.
/// Parameters:
/// - A `BSONDocument` representing the match criteria.
/// - `options`: Optional `DeleteModelOptions`.
case deleteMany(BSONDocument, options: DeleteModelOptions? = nil)
/// An `insertOne`.
/// Parameters:
/// - A `T` to insert.
case insertOne(CollectionType)
/// A `replaceOne`.
/// Parameters:
/// - `filter`: A `BSONDocument` representing the match criteria.
/// - `replacement`: A `T` to use as the replacement value.
/// - `options`: Optional `ReplaceOneModelOptions`.
case replaceOne(filter: BSONDocument, replacement: CollectionType, options: ReplaceOneModelOptions? = nil)
/// An `updateOne`.
/// Parameters:
/// - `filter`: A `BSONDocument` representing the match criteria.
/// - `update`: A `BSONDocument` containing update operators.
/// - `options`: Optional `UpdateModelOptions`.
case updateOne(filter: BSONDocument, update: BSONDocument, options: UpdateModelOptions? = nil)
/// An `updateMany`.
/// Parameters:
/// - `filter`: A `BSONDocument` representing the match criteria.
/// - `update`: A `BSONDocument` containing update operators.
/// - `options`: Optional `UpdateModelOptions`.
case updateMany(filter: BSONDocument, update: BSONDocument, options: UpdateModelOptions? = nil)
/// Adds this model to the provided `mongoc_bulk_t`, using the provided encoder for encoding options and
/// `CollectionType` values if needed. If this is an `insertOne`, returns the `_id` field of the inserted
/// document; otherwise, returns nil.
fileprivate func addToBulkWrite(_ bulk: OpaquePointer, encoder: BSONEncoder) throws -> BSON? {
var error = bson_error_t()
let success: Bool
var res: BSON?
switch self {
case let .deleteOne(filter, options):
let opts = try encoder.encode(options)
success = filter.withBSONPointer { filterPtr in
withOptionalBSONPointer(to: opts) { optsPtr in
mongoc_bulk_operation_remove_one_with_opts(bulk, filterPtr, optsPtr, &error)
}
}
case let .deleteMany(filter, options):
let opts = try encoder.encode(options)
success = filter.withBSONPointer { filterPtr in
withOptionalBSONPointer(to: opts) { optsPtr in
mongoc_bulk_operation_remove_many_with_opts(bulk, filterPtr, optsPtr, &error)
}
}
case let .insertOne(value):
let document: BSONDocument = try convertingBSONErrors { try encoder.encode(value).withID() }
success = document.withBSONPointer { docPtr in
mongoc_bulk_operation_insert_with_opts(bulk, docPtr, nil, &error)
}
guard let insertedID = document["_id"] else {
// we called `withID()`, so this should be present.
fatalError("Failed to get value for _id from document")
}
res = insertedID
case let .replaceOne(filter, replacement, options):
let replacement = try encoder.encode(replacement)
let opts = try encoder.encode(options)
success = filter.withBSONPointer { filterPtr in
replacement.withBSONPointer { replacementPtr in
withOptionalBSONPointer(to: opts) { optsPtr in
mongoc_bulk_operation_replace_one_with_opts(bulk, filterPtr, replacementPtr, optsPtr, &error)
}
}
}
case let .updateOne(filter, update, options):
let opts = try encoder.encode(options)
success = filter.withBSONPointer { filterPtr in
update.withBSONPointer { updatePtr in
withOptionalBSONPointer(to: opts) { optsPtr in
mongoc_bulk_operation_update_one_with_opts(bulk, filterPtr, updatePtr, optsPtr, &error)
}
}
}
case let .updateMany(filter, update, options):
let opts = try encoder.encode(options)
success = filter.withBSONPointer { filterPtr in
update.withBSONPointer { updatePtr in
withOptionalBSONPointer(to: opts) { optsPtr in
mongoc_bulk_operation_update_many_with_opts(bulk, filterPtr, updatePtr, optsPtr, &error)
}
}
}
}
guard success else {
throw extractMongoError(error: error) // should be invalidArgumentError
}
return res
}
}
/// Options to use with a `WriteModel.deleteOne` or `WriteModel.deleteMany`.
public struct DeleteModelOptions: Codable {
/// The collation to use.
public var collation: BSONDocument?
/// A document or string that specifies the index to use to support the query. Only supported in server 4.4+.
public var hint: IndexHint?
/// Initializer allowing any/all options to be omitted or optional.
public init(collation: BSONDocument? = nil, hint: IndexHint? = nil) {
self.collation = collation
self.hint = hint
}
}
/// Options to use with a `WriteModel.replaceOne`.
public struct ReplaceOneModelOptions: Codable {
/// The collation to use.
public var collation: BSONDocument?
/// A document or string that specifies the index to use to support the query. Only supported in server 4.2+.
public var hint: IndexHint?
/// When `true`, creates a new document if no document matches the query.
public var upsert: Bool?
/// Initializer allowing any/all options to be omitted or optional.
public init(collation: BSONDocument? = nil, hint: IndexHint? = nil, upsert: Bool? = nil) {
self.collation = collation
self.hint = hint
self.upsert = upsert
}
}
/// Options to use with a `WriteModel.updateOne` or `WriteModel.updateMany`.
public struct UpdateModelOptions: Codable {
/// A set of filters specifying to which array elements an update should apply.
public var arrayFilters: [BSONDocument]?
/// The collation to use.
public var collation: BSONDocument?
/// A document or string that specifies the index to use to support the query. Only supported in server 4.2+.
public var hint: IndexHint?
/// When `true`, creates a new document if no document matches the query.
public var upsert: Bool?
/// Initializer allowing any/all options to be omitted or optional.
public init(
arrayFilters: [BSONDocument]? = nil,
collation: BSONDocument? = nil,
hint: IndexHint? = nil,
upsert: Bool? = nil
) {
self.arrayFilters = arrayFilters
self.collation = collation
self.hint = hint
self.upsert = upsert
}
}
/// An operation corresponding to a "bulkWrite" command on a collection.
internal struct BulkWriteOperation<T: Codable>: Operation {
private let collection: MongoCollection<T>
private let models: [WriteModel<T>]
internal let options: BulkWriteOptions?
fileprivate let encoder: BSONEncoder
fileprivate init(collection: MongoCollection<T>, models: [WriteModel<T>], options: BulkWriteOptions?) {
self.collection = collection
self.models = models
self.options = options
self.encoder = collection.encoder
}
/**
* Executes the bulk write operation and returns a `BulkWriteResult` or
* `nil` if the write concern is unacknowledged.
*
* - Throws:
* - `MongoError.CommandError` if an error occurs that prevents the operation from executing.
* - `MongoError.BulkWriteError` if an error occurs while performing the writes.
*/
internal func execute(using connection: Connection, session: ClientSession?) throws -> BulkWriteResult? {
let opts = try encodeOptions(options: options, session: session)
var insertedIDs: [Int: BSON] = [:]
if session?.inTransaction == true && self.options?.writeConcern != nil {
throw MongoError.InvalidArgumentError(
message: "Cannot specify a write concern on an individual helper in a " +
"transaction. Instead specify it when starting the transaction."
)
}
return try self.collection.withMongocCollection(from: connection) { collPtr in
let bulk: OpaquePointer = withOptionalBSONPointer(to: opts) { optsPtr in
guard let bulk = mongoc_collection_create_bulk_operation_with_opts(collPtr, optsPtr) else {
fatalError("failed to initialize mongoc_bulk_operation_t")
}
return bulk
}
defer { mongoc_bulk_operation_destroy(bulk) }
try self.models.enumerated().forEach { index, model in
if let res = try model.addToBulkWrite(bulk, encoder: self.encoder) {
insertedIDs[index] = res
}
}
var error = bson_error_t()
let (serverID, reply) = withStackAllocatedMutableBSONPointer { replyPtr -> (UInt32, BSONDocument) in
let serverID = mongoc_bulk_operation_execute(bulk, replyPtr, &error)
let reply = BSONDocument(copying: replyPtr)
return (serverID, reply)
}
var writeConcernAcknowledged: Bool
if session?.inTransaction == true {
// Bulk write operations in transactions must get their write concern from the session, not from
// the `BulkWriteOptions` passed to the `bulkWrite` helper. `libmongoc` surfaces this
// implementation detail by nulling out the write concern stored on the bulk write. To sidestep
// this, we can only call `mongoc_bulk_operation_get_write_concern` out of a transaction.
//
// In a transaction, default to writeConcernAcknowledged = true. This is acceptable because
// transactions do not support unacknowledged writes.
writeConcernAcknowledged = true
} else {
let writeConcern = WriteConcern(copying: mongoc_bulk_operation_get_write_concern(bulk))
writeConcernAcknowledged = writeConcern.isAcknowledged
}
let result: BulkWriteResult? = writeConcernAcknowledged
? try BulkWriteResult(reply: reply, insertedIDs: insertedIDs)
: nil
guard serverID != 0 else {
throw extractBulkWriteError(
for: self,
error: error,
reply: reply,
partialResult: result
)
}
return result
}
}
}
/// Options to use when performing a bulk write operation on a `MongoCollection`.
public struct BulkWriteOptions: Codable {
/// If `true`, allows the write to opt-out of document level validation.
public var bypassDocumentValidation: Bool?
/**
* If `true` (the default), operations will be executed serially in order
* and a write error will abort execution of the entire bulk write. If
* `false`, operations may execute in an arbitrary order and execution will
* not stop after encountering a write error (i.e. multiple errors may be
* reported after all operations have been attempted).
*/
public var ordered: Bool
/// An optional WriteConcern to use for the bulk write.
public var writeConcern: WriteConcern?
/// Convenience initializer allowing any/all parameters to be omitted or optional
public init(bypassDocumentValidation: Bool? = nil, ordered: Bool? = nil, writeConcern: WriteConcern? = nil) {
self.bypassDocumentValidation = bypassDocumentValidation
self.ordered = ordered ?? true
self.writeConcern = writeConcern
}
/// Internal initializer used to convert an `InsertManyOptions` optional to a `BulkWriteOptions` optional.
internal init?(from insertManyOptions: InsertManyOptions?) {
guard let options = insertManyOptions else {
return nil
}
self.bypassDocumentValidation = options.bypassDocumentValidation
self.ordered = options.ordered
self.writeConcern = options.writeConcern
}
private enum CodingKeys: String, CodingKey {
case bypassDocumentValidation, ordered, writeConcern
}
// A manual implementation is required to enforce a default value for ordered without making it optional.
public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
self.bypassDocumentValidation = try container.decodeIfPresent(Bool.self, forKey: .bypassDocumentValidation)
self.ordered = try container.decodeIfPresent(Bool.self, forKey: .ordered) ?? true
self.writeConcern = try container.decodeIfPresent(WriteConcern.self, forKey: .writeConcern)
}
}
/// The result of a bulk write operation on a `MongoCollection`.
public struct BulkWriteResult: Codable {
/// Number of documents deleted.
public let deletedCount: Int
/// Number of documents inserted.
public let insertedCount: Int
/// Map of the index of the operation to the id of the inserted document.
public let insertedIDs: [Int: BSON]
/// Number of documents matched for update.
public let matchedCount: Int
/// Number of documents modified.
public let modifiedCount: Int
/// Number of documents upserted.
public let upsertedCount: Int
/// Map of the index of the operation to the id of the upserted document.
public let upsertedIDs: [Int: BSON]
private enum CodingKeys: String, CodingKey {
case deletedCount,
insertedCount,
insertedIDs = "insertedIds",
matchedCount,
modifiedCount,
upsertedCount,
upsertedIDs = "upsertedIds"
}
// The key names libmongoc uses for its bulk write results.
private enum MongocKeys: String {
case deletedCount = "nRemoved"
case insertedCount = "nInserted"
case matchedCount = "nMatched"
case modifiedCount = "nModified"
case upsertedCount = "nUpserted"
case upserted
}
public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
// None of the results must be present themselves, but at least one must.
guard !container.allKeys.isEmpty else {
throw DecodingError.valueNotFound(
BulkWriteResult.self,
DecodingError.Context(
codingPath: decoder.codingPath,
debugDescription: "No results found"
)
)
}
self.deletedCount = try container.decodeIfPresent(Int.self, forKey: .deletedCount) ?? 0
self.matchedCount = try container.decodeIfPresent(Int.self, forKey: .matchedCount) ?? 0
self.modifiedCount = try container.decodeIfPresent(Int.self, forKey: .modifiedCount) ?? 0
let insertedIDs = try container.decodeIfPresent([Int: BSON].self, forKey: .insertedIDs) ?? [:]
self.insertedIDs = insertedIDs
self.insertedCount = try container.decodeIfPresent(Int.self, forKey: .insertedCount) ?? insertedIDs.count
let upsertedIDs = try container.decodeIfPresent([Int: BSON].self, forKey: .upsertedIDs) ?? [:]
self.upsertedIDs = upsertedIDs
self.upsertedCount = try container.decodeIfPresent(Int.self, forKey: .upsertedCount) ?? upsertedIDs.count
}
/**
* Create a `BulkWriteResult` from a reply and map of inserted IDs.
*
* Note: we forgo using a Decodable initializer because we still need to
* build a map for `upsertedIDs` and explicitly add `insertedIDs`, and because
* libmongoc uses a different naming convention for the field names than is expected.
*
* While `mongoc_bulk_operation_execute()` guarantees that `reply` will be
* initialized, it doesn't guarantee that all fields will be set. On error,
* we should expect fields to be missing and handle that gracefully.
*
* - Parameters:
* - reply: A `BSONDocument` result from `mongoc_bulk_operation_execute()`
* - insertedIDs: Map of inserted IDs
*
* - Throws:
* - `MongoError.InternalError` if an unexpected error occurs reading the reply from the server.
*/
fileprivate init?(reply: BSONDocument, insertedIDs: [Int: BSON]) throws {
var deletedCount: Int?
var insertedCount: Int?
var matchedCount: Int?
var modifiedCount: Int?
var upsertedCount: Int?
var upsertedIDs = [Int: BSON]()
// To improve the performance of this initializer, we perform only a single walk over the entire document and
// record the values as they are encountered instead of doing repeated random lookups, since each lookup
// would result in a traversal of the document.
var seenKey = false
for (k, v) in reply {
guard let key = MongocKeys(rawValue: k) else {
continue
}
seenKey = true
switch key {
case .deletedCount:
deletedCount = v.toInt()
case .insertedCount:
insertedCount = v.toInt()
case .matchedCount:
matchedCount = v.toInt()
case .modifiedCount:
modifiedCount = v.toInt()
case .upsertedCount:
upsertedCount = v.toInt()
case .upserted:
if let upserted = reply[MongocKeys.upserted.rawValue]?.arrayValue?.compactMap({ $0.documentValue }) {
for upsert in upserted {
guard let index = upsert["index"]?.toInt() else {
throw MongoError.InternalError(message: "Could not cast upserted index to `Int`")
}
upsertedIDs[index] = upsert["_id"]
}
}
}
}
guard seenKey else {
return nil
}
self.deletedCount = deletedCount ?? 0
self.insertedCount = insertedCount ?? 0
self.insertedIDs = insertedIDs
self.matchedCount = matchedCount ?? 0
self.modifiedCount = modifiedCount ?? 0
self.upsertedCount = upsertedCount ?? 0
self.upsertedIDs = upsertedIDs
}
/// Internal initializer used for testing purposes and error handling.
internal init(
deletedCount: Int? = nil,
insertedCount: Int? = nil,
insertedIDs: [Int: BSON]? = nil,
matchedCount: Int? = nil,
modifiedCount: Int? = nil,
upsertedCount: Int? = nil,
upsertedIDs: [Int: BSON]? = nil
) {
self.deletedCount = deletedCount ?? 0
self.insertedCount = insertedCount ?? 0
self.insertedIDs = insertedIDs ?? [:]
self.matchedCount = matchedCount ?? 0
self.modifiedCount = modifiedCount ?? 0
self.upsertedCount = upsertedCount ?? 0
self.upsertedIDs = upsertedIDs ?? [:]
}
}