Fix skip-empty-xacts with sequences in test_decoding
authorTomas Vondra <tomas.vondra@postgresql.org>
Sat, 12 Feb 2022 22:50:42 +0000 (23:50 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Sat, 12 Feb 2022 22:50:42 +0000 (23:50 +0100)
Regression tests need to use skip-empty-xacts = false, because there
might be accidental concurrent activity (like autovacuum), particularly
on slow machines. The tests added by 80901b3291 failed to do that in a
couple places, triggering occasional failures on buildfarm.

Fixing the tests however uncovered a bug in the code, because sequence
callbacks did not handle skip-empty-xacts properly. For trasactional
increments we need to check/update the xact_wrote_changes flag, and emit
the BEGIN if it's the first change in the transaction.

Reported-by: Andres Freund
Discussion: https://postgr.es/m/20220212220413.b25amklo7t4xb7ni%40alap3.anarazel.de

contrib/test_decoding/expected/sequence.out
contrib/test_decoding/sql/sequence.sql
contrib/test_decoding/test_decoding.c

index cf21f3a1f4fcecf90bd23070ddaba59247718f4b..dc1bf670122a79ceb266adab07d1f1e820464610 100644 (file)
@@ -87,7 +87,7 @@ SELECT nextval('test_sequence');
 (1 row)
 
 -- show results and drop sequence
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                           data                                          
 ----------------------------------------------------------------------------------------
  BEGIN
@@ -236,7 +236,7 @@ SELECT nextval('test_table_a_seq');
 (1 row)
 
 DROP TABLE test_table;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                            data                                            
 -------------------------------------------------------------------------------------------
  BEGIN
@@ -245,9 +245,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  sequence public.test_table_a_seq: transactional:0 last_value: 33 log_cnt: 0 is_called:1
  sequence public.test_table_a_seq: transactional:0 last_value: 3000 log_cnt: 0 is_called:1
  sequence public.test_table_a_seq: transactional:0 last_value: 3033 log_cnt: 0 is_called:1
- BEGIN
- COMMIT
-(8 rows)
+(6 rows)
 
 -- savepoint test on table with serial column
 BEGIN;
@@ -259,7 +257,7 @@ INSERT INTO test_table (b) VALUES (300);
 ROLLBACK TO SAVEPOINT a;
 DROP TABLE test_table;
 COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                           data                                           
 -----------------------------------------------------------------------------------------
  BEGIN
@@ -308,7 +306,7 @@ SELECT * FROM test_sequence;
 
 DROP SEQUENCE test_sequence;
 COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                           data                                          
 ----------------------------------------------------------------------------------------
  BEGIN
index 42ad9113bf4c4029aaac446625fd27c97d0358f3..978ee8088e1f39999565753a92a3760042b36b7d 100644 (file)
@@ -28,7 +28,7 @@ SELECT setval('test_sequence', 3500, false);
 SELECT nextval('test_sequence');
 
 -- show results and drop sequence
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 DROP SEQUENCE test_sequence;
 
 -- rollback on sequence creation and update
@@ -87,7 +87,7 @@ SELECT * from test_table_a_seq;
 SELECT nextval('test_table_a_seq');
 
 DROP TABLE test_table;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- savepoint test on table with serial column
 BEGIN;
@@ -99,7 +99,7 @@ INSERT INTO test_table (b) VALUES (300);
 ROLLBACK TO SAVEPOINT a;
 DROP TABLE test_table;
 COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- savepoint test on table with serial column
 BEGIN;
@@ -114,6 +114,6 @@ ROLLBACK TO SAVEPOINT a;
 SELECT * FROM test_sequence;
 DROP SEQUENCE test_sequence;
 COMMIT;
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
index 6b6012e095babf0c5b3c129cb71679e5a5104f59..ea22649e41d2157b554c9992170ea2e829faa733 100644 (file)
@@ -774,10 +774,21 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                   int64 last_value, int64 log_cnt, bool is_called)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
    if (!data->include_sequences)
        return;
 
+   /* output BEGIN if we haven't yet, but only for the transactional case */
+   if (transactional)
+   {
+       if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+       {
+           pg_output_begin(ctx, data, txn, false);
+       }
+       txndata->xact_wrote_changes = true;
+   }
+
    OutputPluginPrepareWrite(ctx, true);
    appendStringInfoString(ctx->out, "sequence ");
    appendStringInfoString(ctx->out,
@@ -994,10 +1005,21 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                          int64 last_value, int64 log_cnt, bool is_called)
 {
    TestDecodingData *data = ctx->output_plugin_private;
+   TestDecodingTxnData *txndata = txn->output_plugin_private;
 
    if (!data->include_sequences)
        return;
 
+   /* output BEGIN if we haven't yet, but only for the transactional case */
+   if (transactional)
+   {
+       if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+       {
+           pg_output_begin(ctx, data, txn, false);
+       }
+       txndata->xact_wrote_changes = true;
+   }
+
    OutputPluginPrepareWrite(ctx, true);
    appendStringInfoString(ctx->out, "streaming sequence ");
    appendStringInfoString(ctx->out,