* allocator, evicting the oldest changes would make it more likely the
* memory gets actually freed.
*
+ * We use a max-heap with transaction size as the key to efficiently find
+ * the largest transaction. While the max-heap is empty, we don't update
+ * the max-heap when updating the memory counter. Therefore, we can get
+ * the largest transaction in O(N) time, where N is the number of
+ * transactions including top-level transactions and subtransactions.
+ *
+ * We build the max-heap just before selecting the largest transactions
+ * if the number of transactions being decoded is higher than the threshold,
+ * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
+ * update the max-heap when updating the memory counter. The intention is
+ * to efficiently find the largest transaction in O(1) time instead of
+ * incurring the cost of memory counter updates (O(log N)). Once the number
+ * of transactions got lower than the threshold, we reset the max-heap
+ * (refer to ReorderBufferMaybeResetMaxHeap() for details).
+ *
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory. At that point we can't use the memory limit directly
* as we load the subxacts independently. One option to deal with this
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Threshold of the total number of top-level and sub transactions that
+ * controls whether we use the max-heap for tracking their sizes. Although
+ * using the max-heap to select the largest transaction is effective when
+ * there are many transactions being decoded, maintaining the max-heap while
+ * updating the memory statistics can be costly. Therefore, we use
+ * MaxConnections as the threshold so that we use the max-heap only when
+ * using subtransactions.
+ */
+#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
+
+/*
+ * A macro to check if the max-heap is ready to use and needs to be updated
+ * accordingly.
+ */
+#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
+static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
+static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
static Size ReorderBufferChangeSize(ReorderBufferChange *change);
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
+ ReorderBufferTXN *txn,
bool addition, Size sz);
/*
buffer->outbufsize = 0;
buffer->size = 0;
+ /*
+ * The binaryheap is indexed for faster manipulations.
+ *
+ * We allocate the initial heap size greater than
+ * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
+ * until the threshold is exceeded.
+ */
+ buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
+ ReorderBufferTXNSizeCompare,
+ true, NULL);
+
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
{
/* update memory accounting info */
if (upd_mem)
- ReorderBufferChangeMemoryUpdate(rb, change, false,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
ReorderBufferChangeSize(change));
/* free contained data */
txn->nentries_mem++;
/* update memory accounting information */
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
/* process partial change */
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
}
/*
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
+
+ /*
+ * After cleaning up one transaction, the number of transactions might get
+ * lower than the threshold for the max-heap.
+ */
+ ReorderBufferMaybeResetMaxHeap(rb);
}
/*
/* remove the change from it's containing list */
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
/*
* Mark the transaction as streamed.
*
* decide if we reached the memory limit, the transaction counter allows
* us to quickly pick the largest transaction for eviction.
*
+ * Either txn or change must be non-NULL at least. We update the memory
+ * counter of txn if it's non-NULL, otherwise change->txn.
+ *
* When streaming is enabled, we need to update the toplevel transaction
* counters instead - we don't really care about subtransactions as we
* can't stream them individually anyway, and we only pick toplevel
static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
+ ReorderBufferTXN *txn,
bool addition, Size sz)
{
- ReorderBufferTXN *txn;
ReorderBufferTXN *toptxn;
- Assert(change->txn);
+ Assert(txn || change);
/*
* Ignore tuple CID changes, because those are not evicted when reaching
* memory limit. So we just don't count them, because it might easily
* trigger a pointless attempt to spill.
*/
- if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+ if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
return;
- txn = change->txn;
+ if (sz == 0)
+ return;
+
+ if (txn == NULL)
+ txn = change->txn;
+ Assert(txn != NULL);
/*
* Update the total size in top level as well. This is later used to
/* Update the total size in the top transaction. */
toptxn->total_size += sz;
+
+ /* Update the max-heap as well if necessary */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ {
+ if ((txn->size - sz) == 0)
+ binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+ else
+ binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+ }
}
else
{
/* Update the total size in the top transaction. */
toptxn->total_size -= sz;
+
+ /* Update the max-heap as well if necessary */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ {
+ if (txn->size == 0)
+ binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+ else
+ binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+ }
}
Assert(txn->size <= rb->size);
}
}
+
+/* Compare two transactions by size */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+ ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
+ ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+ if (ta->size < tb->size)
+ return -1;
+ if (ta->size > tb->size)
+ return 1;
+ return 0;
+}
+
/*
- * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
+ * Build the max-heap. The heap assembly step is deferred until the end, for
+ * efficiency.
*/
-static ReorderBufferTXN *
-ReorderBufferLargestTXN(ReorderBuffer *rb)
+static void
+ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
{
HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent;
- ReorderBufferTXN *largest = NULL;
+
+ Assert(binaryheap_empty(rb->txn_heap));
hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
ReorderBufferTXN *txn = ent->txn;
- /* if the current transaction is larger, remember it */
- if ((!largest) || (txn->size > largest->size))
- largest = txn;
+ if (txn->size == 0)
+ continue;
+
+ binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
}
+ binaryheap_build(rb->txn_heap);
+}
+
+/*
+ * Reset the max-heap if the number of transactions got lower than the
+ * threshold.
+ */
+static void
+ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
+{
+ /*
+ * If we add and remove transactions right around the threshold, we could
+ * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
+ * reset the max-heap.
+ */
+ if (ReorderBufferMaxHeapIsReady(rb) &&
+ binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
+ binaryheap_reset(rb->txn_heap);
+}
+
+/*
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk)
+ * by doing a linear search or using the max-heap depending on the number of
+ * transactions in ReorderBuffer. Refer to the comments atop this file for the
+ * algorithm details.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTXN(ReorderBuffer *rb)
+{
+ ReorderBufferTXN *largest = NULL;
+
+ if (!ReorderBufferMaxHeapIsReady(rb))
+ {
+ /*
+ * If the number of transactions are small, we scan all transactions
+ * being decoded to get the largest transaction. This saves the cost
+ * of building a max-heap with a small number of transactions.
+ */
+ if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
+ {
+ HASH_SEQ_STATUS hash_seq;
+ ReorderBufferTXNByIdEnt *ent;
+
+ hash_seq_init(&hash_seq, rb->by_txn);
+ while ((ent = hash_seq_search(&hash_seq)) != NULL)
+ {
+ ReorderBufferTXN *txn = ent->txn;
+
+ /* if the current transaction is larger, remember it */
+ if ((!largest) || (txn->size > largest->size))
+ largest = txn;
+ }
+
+ Assert(largest);
+ }
+ else
+ {
+ /*
+ * There are a large number of transactions in ReorderBuffer. We
+ * build the max-heap for efficiently selecting the largest
+ * transactions.
+ */
+ ReorderBufferBuildMaxHeap(rb);
+
+ /*
+ * The max-heap is ready now. We remain the max-heap at least
+ * until we free up enough transactions to bring the total memory
+ * usage below the limit. The largest transaction is selected
+ * below.
+ */
+ Assert(ReorderBufferMaxHeapIsReady(rb));
+ }
+ }
+
+ /* Get the largest transaction from the max-heap */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ largest = (ReorderBufferTXN *)
+ DatumGetPointer(binaryheap_first(rb->txn_heap));
+
Assert(largest);
Assert(largest->size > 0);
Assert(largest->size <= rb->size);
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * 1024L);
+
+ /*
+ * After evicting some transactions, the number of transactions might get
+ * lower than the threshold for the max-heap.
+ */
+ ReorderBufferMaybeResetMaxHeap(rb);
+
}
/*
ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
spilled++;
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
+
/* update the statistics iff we have spilled anything */
if (spilled)
{
* update the accounting too (subtracting the size from the counters). And
* we don't want to underflow there.
*/
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
}
MemoryContextSwitchTo(oldcontext);
/* subtract the old change size */
- ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
/* now add the change back, with the correct size */
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
}