Regression tests need to use skip-empty-xacts = false, because there
might be accidental concurrent activity (like autovacuum), particularly
on slow machines. The tests added by
80901b3291 failed to do that in a
couple places, triggering occasional failures on buildfarm.
Fixing the tests however uncovered a bug in the code, because sequence
callbacks did not handle skip-empty-xacts properly. For trasactional
increments we need to check/update the xact_wrote_changes flag, and emit
the BEGIN if it's the first change in the transaction.
Reported-by: Andres Freund
Discussion: https://postgr.es/m/
20220212220413.b25amklo7t4xb7ni%40alap3.anarazel.de
(1 row)
-- show results and drop sequence
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------------------------------------------
BEGIN
(1 row)
DROP TABLE test_table;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------------------------------------------------------
BEGIN
sequence public.test_table_a_seq: transactional:0 last_value: 33 log_cnt: 0 is_called:1
sequence public.test_table_a_seq: transactional:0 last_value: 3000 log_cnt: 0 is_called:1
sequence public.test_table_a_seq: transactional:0 last_value: 3033 log_cnt: 0 is_called:1
- BEGIN
- COMMIT
-(8 rows)
+(6 rows)
-- savepoint test on table with serial column
BEGIN;
ROLLBACK TO SAVEPOINT a;
DROP TABLE test_table;
COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------------------------------------
BEGIN
DROP SEQUENCE test_sequence;
COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------------------------------------------
BEGIN
SELECT nextval('test_sequence');
-- show results and drop sequence
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
DROP SEQUENCE test_sequence;
-- rollback on sequence creation and update
SELECT nextval('test_table_a_seq');
DROP TABLE test_table;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- savepoint test on table with serial column
BEGIN;
ROLLBACK TO SAVEPOINT a;
DROP TABLE test_table;
COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- savepoint test on table with serial column
BEGIN;
SELECT * FROM test_sequence;
DROP SEQUENCE test_sequence;
COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
int64 last_value, int64 log_cnt, bool is_called)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
if (!data->include_sequences)
return;
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "sequence ");
appendStringInfoString(ctx->out,
int64 last_value, int64 log_cnt, bool is_called)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
if (!data->include_sequences)
return;
+ /* output BEGIN if we haven't yet, but only for the transactional case */
+ if (transactional)
+ {
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = true;
+ }
+
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "streaming sequence ");
appendStringInfoString(ctx->out,