/* data follows */
} ReorderBufferDiskChange;
+#define IsSpecInsert(action) \
+( \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
+)
+#define IsSpecConfirm(action) \
+( \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
+)
+#define IsInsertOrUpdate(action) \
+( \
+ (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
+ ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
+ ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
+)
+
/*
* Maximum number of changes kept in memory, per transaction. After that,
* changes are spooled to disk.
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
ReorderBufferTXN *txn, CommandId cid);
+/*
+ * ---------------------------------------
+ * Streaming support functions
+ * ---------------------------------------
+ */
+static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
+static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
+static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
/* ---------------------------------------
* toast reassembly support
* ---------------------------------------
dlist_init(&txn->tuplecids);
dlist_init(&txn->subtxns);
+ /* InvalidCommandId is not zero, so set it explicitly */
+ txn->command_id = InvalidCommandId;
+
return txn;
}
}
/*
- * Free an ReorderBufferChange.
+ * Free a ReorderBufferChange and update memory accounting, if requested.
*/
void
-ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
+ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
+ bool upd_mem)
{
/* update memory accounting info */
- ReorderBufferChangeMemoryUpdate(rb, change, false);
+ if (upd_mem)
+ ReorderBufferChangeMemoryUpdate(rb, change, false);
/* free contained data */
switch (change->action)
}
/*
- * Queue a change into a transaction so it can be replayed upon commit.
+ * Record the partial change for the streaming of in-progress transactions. We
+ * can stream only complete changes so if we have a partial change like toast
+ * table insert or speculative insert then we mark such a 'txn' so that it
+ * can't be streamed. We also ensure that if the changes in such a 'txn' are
+ * above logical_decoding_work_mem threshold then we stream them as soon as we
+ * have a complete change.
+ */
+static void
+ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferChange *change,
+ bool toast_insert)
+{
+ ReorderBufferTXN *toptxn;
+
+ /*
+ * The partial changes need to be processed only while streaming
+ * in-progress transactions.
+ */
+ if (!ReorderBufferCanStream(rb))
+ return;
+
+ /* Get the top transaction. */
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
+
+ /*
+ * Set the toast insert bit whenever we get toast insert to indicate a
+ * partial change and clear it when we get the insert or update on main
+ * table (Both update and insert will do the insert in the toast table).
+ */
+ if (toast_insert)
+ toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
+ else if (rbtxn_has_toast_insert(toptxn) &&
+ IsInsertOrUpdate(change->action))
+ toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
+
+ /*
+ * Set the spec insert bit whenever we get the speculative insert to
+ * indicate the partial change and clear the same on speculative confirm.
+ */
+ if (IsSpecInsert(change->action))
+ toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
+ else if (IsSpecConfirm(change->action))
+ {
+ /*
+ * Speculative confirm change must be preceded by speculative
+ * insertion.
+ */
+ Assert(rbtxn_has_spec_insert(toptxn));
+ toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
+ }
+
+ /*
+ * Stream the transaction if it is serialized before and the changes are
+ * now complete in the top-level transaction.
+ *
+ * The reason for doing the streaming of such a transaction as soon as we
+ * get the complete change for it is that previously it would have reached
+ * the memory threshold and wouldn't get streamed because of incomplete
+ * changes. Delaying such transactions would increase apply lag for them.
+ */
+ if (ReorderBufferCanStartStreaming(rb) &&
+ !(rbtxn_has_incomplete_tuple(toptxn)) &&
+ rbtxn_is_serialized(txn))
+ ReorderBufferStreamTXN(rb, toptxn);
+}
+
+/*
+ * Queue a change into a transaction so it can be replayed upon commit or will be
+ * streamed when we reach logical_decoding_work_mem threshold.
*/
void
ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
- ReorderBufferChange *change)
+ ReorderBufferChange *change, bool toast_insert)
{
ReorderBufferTXN *txn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ /*
+ * While streaming the previous changes we have detected that the
+ * transaction is aborted. So there is no point in collecting further
+ * changes for it.
+ */
+ if (txn->concurrent_abort)
+ {
+ /*
+ * We don't need to update memory accounting for this change as we
+ * have not added it to the queue yet.
+ */
+ ReorderBufferReturnChange(rb, change, false);
+ return;
+ }
+
change->lsn = lsn;
change->txn = txn;
/* update memory accounting information */
ReorderBufferChangeMemoryUpdate(rb, change, true);
+ /* process partial change */
+ ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
+
/* check the memory limits and evict something if needed */
ReorderBufferCheckMemoryLimit(rb);
}
change->data.msg.message = palloc(message_size);
memcpy(change->data.msg.message, message, message_size);
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
MemoryContextSwitchTo(oldcontext);
}
#endif
}
+/*
+ * AssertChangeLsnOrder
+ *
+ * Check ordering of changes in the (sub)transaction.
+ */
+static void
+AssertChangeLsnOrder(ReorderBufferTXN *txn)
+{
+#ifdef USE_ASSERT_CHECKING
+ dlist_iter iter;
+ XLogRecPtr prev_lsn = txn->first_lsn;
+
+ dlist_foreach(iter, &txn->changes)
+ {
+ ReorderBufferChange *cur_change;
+
+ cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ Assert(txn->first_lsn != InvalidXLogRecPtr);
+ Assert(cur_change->lsn != InvalidXLogRecPtr);
+ Assert(txn->first_lsn <= cur_change->lsn);
+
+ if (txn->end_lsn != InvalidXLogRecPtr)
+ Assert(cur_change->lsn <= txn->end_lsn);
+
+ Assert(prev_lsn <= cur_change->lsn);
+
+ prev_lsn = cur_change->lsn;
+ }
+#endif
+}
+
/*
* ReorderBufferGetOldestTXN
* Return oldest transaction in reorderbuffer
*iter_state = NULL;
+ /* Check ordering of changes in the toplevel transaction. */
+ AssertChangeLsnOrder(txn);
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
+ /* Check ordering of changes in this subtransaction. */
+ AssertChangeLsnOrder(cur_txn);
+
if (cur_txn->nentries > 0)
nr_txns++;
}
{
change = dlist_container(ReorderBufferChange, node,
dlist_pop_head_node(&state->old_change));
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
Assert(dlist_is_empty(&state->old_change));
}
change = dlist_container(ReorderBufferChange, node,
dlist_pop_head_node(&state->old_change));
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
Assert(dlist_is_empty(&state->old_change));
}
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
/*
Assert(change->txn == txn);
Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
/*
dlist_delete(&txn->base_snapshot_node);
}
+ /*
+ * Cleanup the snapshot for the last streamed run.
+ */
+ if (txn->snapshot_now != NULL)
+ {
+ Assert(rbtxn_is_streamed(txn));
+ ReorderBufferFreeSnap(rb, txn->snapshot_now);
+ }
+
/*
* Remove TXN from its containing list.
*
ReorderBufferReturnTXN(rb, txn);
}
+/*
+ * Discard changes from a transaction (and subtransactions), after streaming
+ * them. Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
+ */
+static void
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ dlist_mutable_iter iter;
+
+ /* cleanup subtransactions & their changes */
+ dlist_foreach_modify(iter, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+ subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+ /*
+ * Subtransactions are always associated to the toplevel TXN, even if
+ * they originally were happening inside another subtxn, so we won't
+ * ever recurse more than one level deep here.
+ */
+ Assert(rbtxn_is_known_subxact(subtxn));
+ Assert(subtxn->nsubtxns == 0);
+
+ ReorderBufferTruncateTXN(rb, subtxn);
+ }
+
+ /* cleanup changes in the toplevel txn */
+ dlist_foreach_modify(iter, &txn->changes)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+
+ /* remove the change from it's containing list */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+
+ /*
+ * Mark the transaction as streamed.
+ *
+ * The toplevel transaction, identified by (toptxn==NULL), is marked as
+ * streamed always, even if it does not contain any changes (that is, when
+ * all the changes are in subtransactions).
+ *
+ * For subtransactions, we only mark them as streamed when there are
+ * changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts for
+ * XIDs the downstream is not aware of. And of course, it always knows
+ * about the toplevel xact (we send the XID in all messages), but we never
+ * stream XIDs of empty subxacts.
+ */
+ if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ txn->txn_flags |= RBTXN_IS_STREAMED;
+
+ /*
+ * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
+ * memory. We could also keep the hash table and update it with new ctid
+ * values, but this seems simpler and good enough for now.
+ */
+ if (txn->tuplecid_hash != NULL)
+ {
+ hash_destroy(txn->tuplecid_hash);
+ txn->tuplecid_hash = NULL;
+ }
+
+ /* If this txn is serialized then clean the disk space. */
+ if (rbtxn_is_serialized(txn))
+ {
+ ReorderBufferRestoreCleanup(rb, txn);
+ txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
+ }
+
+ /* also reset the number of entries in the transaction */
+ txn->nentries_mem = 0;
+ txn->nentries = 0;
+}
+
/*
* Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
* HeapTupleSatisfiesHistoricMVCC.
}
/*
- * Perform the replay of a transaction and its non-aborted subtransactions.
- *
- * Subtransactions previously have to be processed by
- * ReorderBufferCommitChild(), even if previously assigned to the toplevel
- * transaction with ReorderBufferAssignChild.
- *
- * We currently can only decode a transaction's contents when its commit
- * record is read because that's the only place where we know about cache
- * invalidations. Thus, once a toplevel commit is read, we iterate over the top
- * and subtransactions (using a k-way merge) and replay the changes in lsn
- * order.
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
- XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
- ReorderBufferTXN *txn;
- volatile Snapshot snapshot_now;
- volatile CommandId command_id = FirstCommandId;
- bool using_subtxn;
- ReorderBufferIterTXNState *volatile iterstate = NULL;
+ /* we should only call this for previously streamed transactions */
+ Assert(rbtxn_is_streamed(txn));
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
+ ReorderBufferStreamTXN(rb, txn);
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
+ rb->stream_commit(rb, txn, txn->final_lsn);
- txn->final_lsn = commit_lsn;
- txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
- txn->origin_id = origin_id;
- txn->origin_lsn = origin_lsn;
+ ReorderBufferCleanupTXN(rb, txn);
+}
+/*
+ * Set xid to detect concurrent aborts.
+ *
+ * While streaming an in-progress transaction there is a possibility that the
+ * (sub)transaction might get aborted concurrently. In such case if the
+ * (sub)transaction has catalog update then we might decode the tuple using
+ * wrong catalog version. For example, suppose there is one catalog tuple with
+ * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
+ * and after that we will have two tuples (xmin: 500, xmax: 501) and
+ * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
+ * say 502 updates the same catalog tuple then the first tuple will be changed
+ * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
+ * the tuple inserted/updated in 501 after the catalog update, we will see the
+ * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
+ * consider that the tuple is deleted by xid 502 which is not visible to our
+ * snapshot. And when we will try to decode with that catalog tuple, it can
+ * lead to a wrong result or a crash. So, it is necessary to detect
+ * concurrent aborts to allow streaming of in-progress transactions.
+ *
+ * For detecting the concurrent abort we set CheckXidAlive to the current
+ * (sub)transaction's xid for which this change belongs to. And, during
+ * catalog scan we can check the status of the xid and if it is aborted we will
+ * report a specific error so that we can stop streaming current transaction
+ * and discard the already streamed changes on such an error. We might have
+ * already streamed some of the changes for the aborted (sub)transaction, but
+ * that is fine because when we decode the abort we will stream abort message
+ * to truncate the changes in the subscriber.
+ */
+static inline void
+SetupCheckXidLive(TransactionId xid)
+{
/*
- * If this transaction has no snapshot, it didn't make any changes to the
- * database, so there's nothing to decode. Note that
- * ReorderBufferCommitChild will have transferred any snapshots from
- * subtransactions if there were any.
+ * If the input transaction id is already set as a CheckXidAlive then
+ * nothing to do.
*/
- if (txn->base_snapshot == NULL)
- {
- Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
+ if (TransactionIdEquals(CheckXidAlive, xid))
return;
+
+ /*
+ * setup CheckXidAlive if it's not committed yet. We don't check if the
+ * xid is aborted. That will happen during catalog access.
+ */
+ if (!TransactionIdDidCommit(xid))
+ CheckXidAlive = xid;
+ else
+ CheckXidAlive = InvalidTransactionId;
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying change.
+ */
+static inline void
+ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ if (streaming)
+ rb->stream_change(rb, txn, relation, change);
+ else
+ rb->apply_change(rb, txn, relation, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the truncate.
+ */
+static inline void
+ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ int nrelations, Relation *relations,
+ ReorderBufferChange *change, bool streaming)
+{
+ if (streaming)
+ rb->stream_truncate(rb, txn, nrelations, relations, change);
+ else
+ rb->apply_truncate(rb, txn, nrelations, relations, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the message.
+ */
+static inline void
+ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferChange *change, bool streaming)
+{
+ if (streaming)
+ rb->stream_message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+ else
+ rb->message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+}
+
+/*
+ * Function to store the command id and snapshot at the end of the current
+ * stream so that we can reuse the same while sending the next stream.
+ */
+static inline void
+ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Snapshot snapshot_now, CommandId command_id)
+{
+ txn->command_id = command_id;
+
+ /* Avoid copying if it's already copied. */
+ if (snapshot_now->copied)
+ txn->snapshot_now = snapshot_now;
+ else
+ txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
+ txn, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN to handle the concurrent
+ * abort of the streaming transaction. This resets the TXN such that it
+ * can be used to stream the remaining data of transaction being processed.
+ */
+static void
+ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Snapshot snapshot_now,
+ CommandId command_id,
+ XLogRecPtr last_lsn,
+ ReorderBufferChange *specinsert)
+{
+ /* Discard the changes that we just streamed */
+ ReorderBufferTruncateTXN(rb, txn);
+
+ /* Free all resources allocated for toast reconstruction */
+ ReorderBufferToastReset(rb, txn);
+
+ /* Return the spec insert change if it is not NULL */
+ if (specinsert != NULL)
+ {
+ ReorderBufferReturnChange(rb, specinsert, true);
+ specinsert = NULL;
}
- snapshot_now = txn->base_snapshot;
+ /* Stop the stream. */
+ rb->stream_stop(rb, txn, last_lsn);
+
+ /* Remember the command ID and snapshot for the streaming run */
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ *
+ * Send data of a transaction (and its subtransactions) to the
+ * output plugin. We iterate over the top and subtransactions (using a k-way
+ * merge) and replay the changes in lsn order.
+ *
+ * If streaming is true then data will be sent using stream API.
+ */
+static void
+ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn,
+ volatile Snapshot snapshot_now,
+ volatile CommandId command_id,
+ bool streaming)
+{
+ bool using_subtxn;
+ MemoryContext ccxt = CurrentMemoryContext;
+ ReorderBufferIterTXNState *volatile iterstate = NULL;
+ volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
+ ReorderBufferChange *volatile specinsert = NULL;
+ volatile bool stream_started = false;
+ ReorderBufferTXN *volatile curtxn = NULL;
/* build data to be able to lookup the CommandIds of catalog tuples */
ReorderBufferBuildTupleCidHash(rb, txn);
PG_TRY();
{
ReorderBufferChange *change;
- ReorderBufferChange *specinsert = NULL;
if (using_subtxn)
- BeginInternalSubTransaction("replay");
+ BeginInternalSubTransaction(streaming ? "stream" : "replay");
else
StartTransactionCommand();
- rb->begin(rb, txn);
+ /* We only need to send begin/commit for non-streamed transactions. */
+ if (!streaming)
+ rb->begin(rb, txn);
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
Relation relation = NULL;
Oid reloid;
+ /*
+ * We can't call start stream callback before processing first
+ * change.
+ */
+ if (prev_lsn == InvalidXLogRecPtr)
+ {
+ if (streaming)
+ {
+ txn->origin_id = change->origin_id;
+ rb->stream_start(rb, txn, change->lsn);
+ stream_started = true;
+ }
+ }
+
+ /*
+ * Enforce correct ordering of changes, merged from multiple
+ * subtransactions. The changes may have the same LSN due to
+ * MULTI_INSERT xlog records.
+ */
+ Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
+
+ prev_lsn = change->lsn;
+
+ /* Set the current xid to detect concurrent aborts. */
+ if (streaming)
+ {
+ curtxn = change->txn;
+ SetupCheckXidLive(curtxn->xid);
+ }
+
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
if (!IsToastRelation(relation))
{
ReorderBufferToastReplace(rb, txn, relation, change);
- rb->apply_change(rb, txn, relation, change);
+ ReorderBufferApplyChange(rb, txn, relation, change,
+ streaming);
/*
* Only clear reassembled toast chunks if we're sure
*/
if (specinsert != NULL)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
- if (relation != NULL)
+ if (RelationIsValid(relation))
{
RelationClose(relation);
relation = NULL;
/* clear out a pending (and thus failed) speculation */
if (specinsert != NULL)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
relations[nrelations++] = relation;
}
- rb->apply_truncate(rb, txn, nrelations, relations, change);
+ /* Apply the truncate. */
+ ReorderBufferApplyTruncate(rb, txn, nrelations,
+ relations, change,
+ streaming);
for (i = 0; i < nrelations; i++)
RelationClose(relations[i]);
}
case REORDER_BUFFER_CHANGE_MESSAGE:
- rb->message(rb, txn, change->lsn, true,
- change->data.msg.prefix,
- change->data.msg.message_size,
- change->data.msg.message);
+ ReorderBufferApplyMessage(rb, txn, change, streaming);
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
snapshot_now = change->data.snapshot;
}
-
/* and continue with the new one */
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
break;
*/
if (specinsert)
{
- ReorderBufferReturnChange(rb, specinsert);
+ ReorderBufferReturnChange(rb, specinsert, true);
specinsert = NULL;
}
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
- /* call commit callback */
- rb->commit(rb, txn, commit_lsn);
+ /*
+ * Done with current changes, send the last message for this set of
+ * changes depending upon streaming mode.
+ */
+ if (streaming)
+ {
+ if (stream_started)
+ {
+ rb->stream_stop(rb, txn, prev_lsn);
+ stream_started = false;
+ }
+ }
+ else
+ rb->commit(rb, txn, commit_lsn);
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
elog(ERROR, "output plugin used XID %u",
GetCurrentTransactionId());
+ /*
+ * Remember the command ID and snapshot for the next set of changes in
+ * streaming mode.
+ */
+ if (streaming)
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ else if (snapshot_now->copied)
+ ReorderBufferFreeSnap(rb, snapshot_now);
+
/* cleanup */
TeardownHistoricSnapshot(false);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
+ /*
+ * If we are streaming the in-progress transaction then discard the
+ * changes that we just streamed, and mark the transactions as
+ * streamed (if they contained changes). Otherwise, remove all the
+ * changes and deallocate the ReorderBufferTXN.
+ */
+ if (streaming)
+ {
+ ReorderBufferTruncateTXN(rb, txn);
- /* remove potential on-disk data, and deallocate */
- ReorderBufferCleanupTXN(rb, txn);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ ReorderBufferCleanupTXN(rb, txn);
}
PG_CATCH();
{
+ MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+ ErrorData *errdata = CopyErrorData();
+
/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
if (iterstate)
ReorderBufferIterTXNFinish(rb, iterstate);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
+ /*
+ * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
+ * abort of the (sub)transaction we are streaming. We need to do the
+ * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ */
+ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
+ {
+ /*
+ * This error can only occur when we are sending the data in
+ * streaming mode and the streaming is not finished yet.
+ */
+ Assert(streaming);
+ Assert(stream_started);
+
+ /* Cleanup the temporary error state. */
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+ curtxn->concurrent_abort = true;
+
+ /* Reset the TXN so that it is allowed to stream remaining data. */
+ ReorderBufferResetTXN(rb, txn, snapshot_now,
+ command_id, prev_lsn,
+ specinsert);
+ }
+ else
+ {
+ ReorderBufferCleanupTXN(rb, txn);
+ MemoryContextSwitchTo(ecxt);
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
- /* remove potential on-disk data, and deallocate */
- ReorderBufferCleanupTXN(rb, txn);
+/*
+ * Perform the replay of a transaction and its non-aborted subtransactions.
+ *
+ * Subtransactions previously have to be processed by
+ * ReorderBufferCommitChild(), even if previously assigned to the toplevel
+ * transaction with ReorderBufferAssignChild.
+ *
+ * This interface is called once a toplevel commit is read for both streamed
+ * as well as non-streamed transactions.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+ Snapshot snapshot_now;
+ CommandId command_id = FirstCommandId;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
- PG_RE_THROW();
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ /*
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
+ *
+ * Called after everything (origin ID, LSN, ...) is stored in the
+ * transaction to avoid passing that information directly.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ ReorderBufferStreamCommit(rb, txn);
+ return;
}
- PG_END_TRY();
+
+ /*
+ * If this transaction has no snapshot, it didn't make any changes to the
+ * database, so there's nothing to decode. Note that
+ * ReorderBufferCommitChild will have transferred any snapshots from
+ * subtransactions if there were any.
+ */
+ if (txn->base_snapshot == NULL)
+ {
+ Assert(txn->ninvalidations == 0);
+ ReorderBufferCleanupTXN(rb, txn);
+ return;
+ }
+
+ snapshot_now = txn->base_snapshot;
+
+ /* Process and send the changes to output plugin. */
+ ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+ command_id, false);
}
/*
if (txn == NULL)
return;
+ /* For streamed transactions notify the remote node about the abort. */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_abort(rb, txn, lsn);
+
+ /*
+ * We might have decoded changes for this transaction that could load
+ * the cache as per the current transaction's view (consider DDL's
+ * happened in this transaction). We don't want the decoding of future
+ * transactions to use those cache entries so execute invalidations.
+ */
+ if (txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ }
+
/* cosmetic... */
txn->final_lsn = lsn;
if (txn == NULL)
return;
+ /* For streamed transactions notify the remote node about the abort. */
+ if (rbtxn_is_streamed(txn))
+ rb->stream_abort(rb, txn, lsn);
+
/* cosmetic... */
txn->final_lsn = lsn;
change->data.snapshot = snap;
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
}
/*
change->data.command_id = cid;
change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
- ReorderBufferQueueChange(rb, xid, lsn, change);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
}
/*
- * Update the memory accounting info. We track memory used by the whole
- * reorder buffer and the transaction containing the change.
+ * Update memory counters to account for the new or removed change.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
*/
static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
bool addition)
{
Size sz;
+ ReorderBufferTXN *txn;
+ ReorderBufferTXN *toptxn = NULL;
Assert(change->txn);
if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
return;
+ txn = change->txn;
+
+ /* If streaming supported, update the total size in top level as well. */
+ if (ReorderBufferCanStream(rb))
+ {
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
+ }
+
sz = ReorderBufferChangeSize(change);
if (addition)
{
- change->txn->size += sz;
+ txn->size += sz;
rb->size += sz;
+
+ /* Update the total size in the top transaction. */
+ if (toptxn)
+ toptxn->total_size += sz;
}
else
{
- Assert((rb->size >= sz) && (change->txn->size >= sz));
- change->txn->size -= sz;
+ Assert((rb->size >= sz) && (txn->size >= sz));
+ txn->size -= sz;
rb->size -= sz;
+
+ /* Update the total size in the top transaction. */
+ if (toptxn)
+ toptxn->total_size -= sz;
}
+
+ Assert(txn->size <= rb->size);
+ Assert((txn->size >= 0) && (rb->size >= 0));
}
/*
return largest;
}
+/*
+ * Find the largest toplevel transaction to evict (by streaming).
+ *
+ * This can be seen as an optimized version of ReorderBufferLargestTXN, which
+ * should give us the same transaction (because we don't update memory account
+ * for subtransaction with streaming, so it's always 0). But we can simply
+ * iterate over the limited number of toplevel transactions.
+ *
+ * Note that, we skip transactions that contains incomplete changes. There
+ * is a scope of optimization here such that we can select the largest transaction
+ * which has complete changes. But that will make the code and design quite complex
+ * and that might not be worth the benefit. If we plan to stream the transactions
+ * that contains incomplete changes then we need to find a way to partially
+ * stream/truncate the transaction changes in-memory and build a mechanism to
+ * partially truncate the spilled files. Additionally, whenever we partially
+ * stream the transaction we need to maintain the last streamed lsn and next time
+ * we need to restore from that segment and the offset in WAL. As we stream the
+ * changes from the top transaction and restore them subtransaction wise, we need
+ * to even remember the subxact from where we streamed the last change.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+{
+ dlist_iter iter;
+ Size largest_size = 0;
+ ReorderBufferTXN *largest = NULL;
+
+ /* Find the largest top-level transaction. */
+ dlist_foreach(iter, &rb->toplevel_by_lsn)
+ {
+ ReorderBufferTXN *txn;
+
+ txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+ if ((largest != NULL || txn->total_size > largest_size) &&
+ (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
+ {
+ largest = txn;
+ largest_size = txn->total_size;
+ }
+ }
+
+ return largest;
+}
+
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
{
/*
* Pick the largest transaction (or subtransaction) and evict it from
- * memory by serializing it to disk.
+ * memory by streaming, if possible. Otherwise, spill to disk.
*/
- txn = ReorderBufferLargestTXN(rb);
+ if (ReorderBufferCanStartStreaming(rb) &&
+ (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+ {
+ /* we know there has to be one, because the size is not zero */
+ Assert(txn && !txn->toptxn);
+ Assert(txn->total_size > 0);
+ Assert(rb->size >= txn->total_size);
- ReorderBufferSerializeTXN(rb, txn);
+ ReorderBufferStreamTXN(rb, txn);
+ }
+ else
+ {
+ /*
+ * Pick the largest transaction (or subtransaction) and evict it
+ * from memory by serializing it to disk.
+ */
+ txn = ReorderBufferLargestTXN(rb);
+
+ /* we know there has to be one, because the size is not zero */
+ Assert(txn);
+ Assert(txn->size > 0);
+ Assert(rb->size >= txn->size);
+
+ ReorderBufferSerializeTXN(rb, txn);
+ }
/*
* After eviction, the transaction should have no entries in memory,
ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
spilled++;
}
Assert(ondisk->change.action == change->action);
}
+/* Returns true, if the output plugin supports streaming, false, otherwise. */
+static inline bool
+ReorderBufferCanStream(ReorderBuffer *rb)
+{
+ LogicalDecodingContext *ctx = rb->private_data;
+
+ return ctx->streaming;
+}
+
+/* Returns true, if the streaming can be started now, false, otherwise. */
+static inline bool
+ReorderBufferCanStartStreaming(ReorderBuffer *rb)
+{
+ LogicalDecodingContext *ctx = rb->private_data;
+ SnapBuild *builder = ctx->snapshot_builder;
+
+ /*
+ * We can't start streaming immediately even if the streaming is enabled
+ * because we previously decoded this transaction and now just are
+ * restarting.
+ */
+ if (ReorderBufferCanStream(rb) &&
+ !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+ {
+ /* We must have a consistent snapshot by this time */
+ Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT);
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ Snapshot snapshot_now;
+ CommandId command_id;
+
+ /* We can never reach here for a subtransaction. */
+ Assert(txn->toptxn == NULL);
+
+ /*
+ * We can't make any assumptions about base snapshot here, similar to what
+ * ReorderBufferCommit() does. That relies on base_snapshot getting
+ * transferred from subxact in ReorderBufferCommitChild(), but that was
+ * not yet called as the transaction is in-progress.
+ *
+ * So just walk the subxacts and use the same logic here. But we only need
+ * to do that once, when the transaction is streamed for the first time.
+ * After that we need to reuse the snapshot from the previous run.
+ *
+ * Unlike DecodeCommit which adds xids of all the subtransactions in
+ * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
+ * but we do add them to subxip array instead via ReorderBufferCopySnap.
+ * This allows the catalog changes made in subtransactions decoded till
+ * now to be visible.
+ */
+ if (txn->snapshot_now == NULL)
+ {
+ dlist_iter subxact_i;
+
+ /* make sure this transaction is streamed for the first time */
+ Assert(!rbtxn_is_streamed(txn));
+
+ /* at the beginning we should have invalid command ID */
+ Assert(txn->command_id == InvalidCommandId);
+
+ dlist_foreach(subxact_i, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+ subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+ ReorderBufferTransferSnapToParent(txn, subtxn);
+ }
+
+ /*
+ * If this transaction has no snapshot, it didn't make any changes to
+ * the database till now, so there's nothing to decode.
+ */
+ if (txn->base_snapshot == NULL)
+ {
+ Assert(txn->ninvalidations == 0);
+ return;
+ }
+
+ command_id = FirstCommandId;
+ snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+ txn, command_id);
+ }
+ else
+ {
+ /* the transaction must have been already streamed */
+ Assert(rbtxn_is_streamed(txn));
+
+ /*
+ * Nah, we already have snapshot from the previous streaming run. We
+ * assume new subxacts can't move the LSN backwards, and so can't beat
+ * the LSN condition in the previous branch (so no need to walk
+ * through subxacts again). In fact, we must not do that as we may be
+ * using snapshot half-way through the subxact.
+ */
+ command_id = txn->command_id;
+
+ /*
+ * We can't use txn->snapshot_now directly because after the last
+ * streaming run, we might have got some new sub-transactions. So we
+ * need to add them to the snapshot.
+ */
+ snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
+ txn, command_id);
+
+ /* Free the previously copied snapshot. */
+ Assert(txn->snapshot_now->copied);
+ ReorderBufferFreeSnap(rb, txn->snapshot_now);
+ txn->snapshot_now = NULL;
+ }
+
+ /* Process and send the changes to output plugin. */
+ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
+ command_id, true);
+
+ Assert(dlist_is_empty(&txn->changes));
+ Assert(txn->nentries == 0);
+ Assert(txn->nentries_mem == 0);
+}
+
/*
* Size of a change in memory.
*/
dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
dlist_delete(&cleanup->node);
- ReorderBufferReturnChange(rb, cleanup);
+ ReorderBufferReturnChange(rb, cleanup, true);
}
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
dlist_container(ReorderBufferChange, node, it.cur);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change);
+ ReorderBufferReturnChange(rb, change, true);
}
}
BlockNumber blockno;
bool updated_mapping = false;
+ /*
+ * Return unresolved if tuplecid_data is not valid. That's because when
+ * streaming in-progress transactions we may run into tuples with the CID
+ * before actually decoding them. Think e.g. about INSERT followed by
+ * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
+ * INSERT. So in such cases, we assume the CID is from the future
+ * command.
+ */
+ if (tuplecid_data == NULL)
+ return false;
+
/* be careful about padding */
memset(&key, 0, sizeof(key));