-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
stop
(1 row)
+
+starting permutation: s0_init s0_begin s0_truncate s2_begin s2_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s2_commit s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_truncate: TRUNCATE tbl1;
+step s2_begin: BEGIN;
+step s2_truncate: TRUNCATE tbl2;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s2_commit: COMMIT;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl2: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
setup
{
DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl2;
CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE TABLE tbl2 (val1 integer, val2 integer);
}
teardown
{
DROP TABLE tbl1;
+ DROP TABLE tbl2;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
step "s1_checkpoint" { CHECKPOINT; }
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_begin" { BEGIN; }
+step "s2_truncate" { TRUNCATE tbl2; }
+step "s2_commit" { COMMIT; }
+
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
# during the first checkpoint execution. This transaction must be marked as
# record written by bgwriter. One might think we can either stop the bgwriter or
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can purge the old catalog modifying transactions after restoring
+# them from the serialized snapshot. The first checkpoint will serialize the list
+# of two catalog modifying xacts. The purpose of the second checkpoint is to allow
+# partial pruning of the list of catalog modifying xact. The third checkpoint
+# followed by get_changes establishes a restart_point at the first checkpoint LSN.
+# The last get_changes will start decoding from the first checkpoint which
+# restores the list of catalog modifying xacts and then while decoding the second
+# checkpoint record it prunes one of the xacts in that list and when decoding the
+# next checkpoint, it will completely prune that list.
+permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
pfree(workspace);
/*
- * Either all the xacts got purged or none. It is only possible to
- * partially remove the xids from this array if one or more of the xids
- * are still running but not all. That can happen if we start decoding
- * from a point (LSN where the snapshot state became consistent) where all
- * the xacts in this were running and then at least one of those got
- * committed and a few are still running. We will never start from such a
- * point because we won't move the slot's restart_lsn past the point where
- * the oldest running transaction's restart_decoding_lsn is.
+ * Purge xids in ->catchange as well. The purged array must also be sorted
+ * in xidComparator order.
*/
- if (builder->catchange.xcnt == 0 ||
- TransactionIdFollowsOrEquals(builder->catchange.xip[0],
- builder->xmin))
- return;
+ if (builder->catchange.xcnt > 0)
+ {
+ /*
+ * Since catchange.xip is sorted, we find the lower bound of xids that
+ * are still interesting.
+ */
+ for (off = 0; off < builder->catchange.xcnt; off++)
+ {
+ if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
+ builder->xmin))
+ break;
+ }
- Assert(TransactionIdFollows(builder->xmin,
- builder->catchange.xip[builder->catchange.xcnt - 1]));
- pfree(builder->catchange.xip);
- builder->catchange.xip = NULL;
- builder->catchange.xcnt = 0;
+ surviving_xids = builder->catchange.xcnt - off;
- elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
- builder->xmin);
+ if (surviving_xids > 0)
+ {
+ memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+ surviving_xids * sizeof(TransactionId));
+ }
+ else
+ {
+ pfree(builder->catchange.xip);
+ builder->catchange.xip = NULL;
+ }
+
+ elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
+ (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+ builder->xmin, builder->xmax);
+ builder->catchange.xcnt = surviving_xids;
+ }
}
/*