Add STREAM_START/STREAM_STOP for transactional messages during decoding.
authorAmit Kapila <akapila@postgresql.org>
Mon, 30 Oct 2023 09:06:21 +0000 (14:36 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 30 Oct 2023 09:06:21 +0000 (14:36 +0530)
In test_decoding module, when skip_empty_xacts option was specified, add
stream_start/stop for streaming transactional messages. This makes the
handling of transactional messages stream consistent irrespective of
whether skip_empty_xacts option was specified.

Commit 26dd0284b9 made a similar change for non-streaming messages but
forgot to update the streaming cases.

Author: Peter Smith
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com

contrib/test_decoding/expected/stream.out
contrib/test_decoding/expected/twophase_stream.out
contrib/test_decoding/test_decoding.c

index 0f21dcb8e0e442c14079ea99426b0d3a27f952b5..4ab2d47bf8d3c33d3a2420ee5aca79e70a291a11 100644 (file)
@@ -29,7 +29,10 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
-(24 rows)
+(27 rows)
 
 -- streaming test for toast changes
 ALTER TABLE stream_test ALTER COLUMN data set storage external;
index b08bb0e5730be8decc0eef3e7314766520aead6f..a3574f73c8e85e1f9844690ea3596b06af410fbd 100644 (file)
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  preparing streamed transaction 'test1'
-(24 rows)
+(27 rows)
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
+ opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
-(1 row)
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+(4 rows)
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
index ab870d9e4dc505a58a50a7dd49c8a556fec62c09..288fd0bb4abc7792cb4141c0a8ef2208996e6498 100644 (file)
@@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
                                                 const char *prefix, Size sz, const char *message)
 {
+       /* Output stream start if we haven't yet for transactional messages. */
+       if (transactional)
+       {
+               TestDecodingData *data = ctx->output_plugin_private;
+               TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+               if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
+               {
+                       pg_output_stream_start(ctx, data, txn, false);
+               }
+               txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
+       }
+
        OutputPluginPrepareWrite(ctx, true);
 
        if (transactional)