Add 'logical_decoding_mode' GUC.
authorAmit Kapila <akapila@postgresql.org>
Mon, 26 Dec 2022 03:28:16 +0000 (08:58 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 26 Dec 2022 03:28:16 +0000 (08:58 +0530)
This enables streaming or serializing changes immediately in logical
decoding. This parameter is intended to be used to test logical decoding
and replication of large transactions for which otherwise we need to
generate the changes till logical_decoding_work_mem is reached.

This helps in reducing the timing of existing tests related to logical
replication of in-progress transactions and will help in writing tests for
for the upcoming feature for parallelly applying large in-progress
transactions.

Author: Shi yu
Reviewed-by: Sawada Masahiko, Shveta Mallik, Amit Kapila, Dilip Kumar, Kuroda Hayato, Kyotaro Horiguchi
Discussion: https://postgr.es/m/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com

doc/src/sgml/config.sgml
src/backend/replication/logical/reorderbuffer.c
src/backend/utils/misc/guc_tables.c
src/include/replication/reorderbuffer.h
src/test/subscription/t/016_stream_subxact.pl
src/test/subscription/t/018_stream_subxact_abort.pl
src/test/subscription/t/019_stream_subxact_ddl_abort.pl
src/test/subscription/t/023_twophase_stream.pl
src/tools/pgindent/typedefs.list

index 9eedab652df64ec7a3d6e5af9a842a93ff5f249d..3071c8eace4d5e49d7c4c27eb44c63c811462312 100644 (file)
@@ -11597,6 +11597,34 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-mode" xreflabel="logical_decoding_mode">
+      <term><varname>logical_decoding_mode</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>logical_decoding_mode</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Allows streaming or serializing changes immediately in logical decoding.
+        The allowed values of <varname>logical_decoding_mode</varname> are
+        <literal>buffered</literal> and <literal>immediate</literal>. When set
+        to <literal>immediate</literal>, stream each change if
+        <literal>streaming</literal> option (see optional parameters set by
+        <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>)
+        is enabled, otherwise, serialize each change.  When set to
+        <literal>buffered</literal>, which is the default, decoding will stream
+        or serialize changes when <varname>logical_decoding_work_mem</varname>
+        is reached.
+       </para>
+       <para>
+        This parameter is intended to be used to test logical decoding and
+        replication of large transactions for which otherwise we need to
+        generate the changes till <varname>logical_decoding_work_mem</varname>
+        is reached.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
   </sect1>
   <sect1 id="runtime-config-short">
index b567b8b59e21a3533d66a697b4753f94b33fe4cf..92204bd9cdfc67e8715fa7a0d6cba82e0deefc7a 100644 (file)
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
 int            logical_decoding_work_mem;
 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 
+/* GUC variable */
+int            logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
- * disk until we reach under the memory limit.
+ * disk or send to the output plugin until we reach under the memory limit.
+ *
+ * If logical_decoding_mode is set to "immediate", stream or serialize the changes
+ * immediately.
  *
  * XXX At this point we select the transactions until we reach under the memory
  * limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
    ReorderBufferTXN *txn;
 
-   /* bail out if we haven't exceeded the memory limit */
-   if (rb->size < logical_decoding_work_mem * 1024L)
+   /*
+    * Bail out if logical_decoding_mode is buffered and we haven't exceeded
+    * the memory limit.
+    */
+   if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED &&
+       rb->size < logical_decoding_work_mem * 1024L)
        return;
 
    /*
-    * Loop until we reach under the memory limit.  One might think that just
-    * by evicting the largest (sub)transaction we will come under the memory
-    * limit based on assumption that the selected transaction is at least as
-    * large as the most recent change (which caused us to go over the memory
-    * limit). However, that is not true because a user can reduce the
-    * logical_decoding_work_mem to a smaller value before the most recent
+    * If logical_decoding_mode is immediate, loop until there's no change.
+    * Otherwise, loop until we reach under the memory limit. One might think
+    * that just by evicting the largest (sub)transaction we will come under
+    * the memory limit based on assumption that the selected transaction is
+    * at least as large as the most recent change (which caused us to go over
+    * the memory limit). However, that is not true because a user can reduce
+    * the logical_decoding_work_mem to a smaller value before the most recent
     * change.
     */
-   while (rb->size >= logical_decoding_work_mem * 1024L)
+   while (rb->size >= logical_decoding_work_mem * 1024L ||
+          (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE &&
+           rb->size > 0))
    {
        /*
         * Pick the largest transaction (or subtransaction) and evict it from
index 436afe1d2150c72c6e2a9708fb955c65ca6a7505..a37c9f98444f617a3ad75ebecb8f61ccaf444cb4 100644 (file)
@@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
    {NULL, 0, false}
 };
 
+static const struct config_enum_entry logical_decoding_mode_options[] = {
+   {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
+   {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
+   {NULL, 0, false}
+};
+
 StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
                 "array length mismatch");
 
@@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+           gettext_noop("Allows streaming or serializing each change in logical decoding."),
+           NULL,
+           GUC_NOT_IN_SAMPLE
+       },
+       &logical_decoding_mode,
+       LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options,
+       NULL, NULL, NULL
+   },
+
    /* End-of-list marker */
    {
        {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
index c700b55b1c0a42ac62da581cdffc11b64770529f..b27a43618a1640794dac4efba67c3aad3dd7be9a 100644 (file)
 #include "utils/timestamp.h"
 
 extern PGDLLIMPORT int logical_decoding_work_mem;
+extern PGDLLIMPORT int logical_decoding_mode;
+
+/* possible values for logical_decoding_mode */
+typedef enum
+{
+   LOGICAL_DECODING_MODE_BUFFERED,
+   LOGICAL_DECODING_MODE_IMMEDIATE
+} LogicalDecodingMode;
 
 /* an individual tuple, stored in one chunk of memory */
 typedef struct ReorderBufferTupleBuf
index bc0a9cd0531a4b9c56ff9fda79a5c22d1022a0ea..db29f089a04162d6836c61b5556eaafa40ce5a45 100644 (file)
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction containing large subtransactions
+# Test streaming of transaction containing subtransactions
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-   'logical_decoding_work_mem = 64kB');
+   'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -49,27 +49,27 @@ my $result =
    "SELECT count(*), count(c), count(d = 999) FROM test_tab");
 is($result, qq(2|2|2), 'check initial data was copied to subscriber');
 
-# Insert, update and delete enough rows to exceed 64kB limit.
+# Insert, update and delete some rows.
 $node_publisher->safe_psql(
    'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(    3,  500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,  1000) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,  1500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,  2000) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 SAVEPOINT s4;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 DELETE FROM test_tab WHERE mod(a,3) = 0;
 COMMIT;
@@ -80,7 +80,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(1667|1667|1667),
+is($result, qq(12|12|12),
    'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
 );
 
index 551f16df6ddbc9d108559fe3facfae1b895c3bef..1458c3a0fca5507d3b93ed7514dd771f5b4ca60b 100644 (file)
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction containing multiple subtransactions and rollbacks
+# Test streaming of transaction containing multiple subtransactions and rollbacks
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-   'logical_decoding_work_mem = 64kB');
+   'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -48,25 +48,25 @@ my $result =
    "SELECT count(*), count(c) FROM test_tab");
 is($result, qq(2|0), 'check initial data was copied to subscriber');
 
-# large (streamed) transaction with DDL, DML and ROLLBACKs
+# streamed transaction with DDL, DML and ROLLBACKs
 $node_publisher->safe_psql(
    'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+INSERT INTO test_tab VALUES (3, md5(3::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
+INSERT INTO test_tab VALUES (5, md5(5::text));
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
+INSERT INTO test_tab VALUES (6, md5(6::text));
 ROLLBACK TO s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
+INSERT INTO test_tab VALUES (7, md5(7::text));
 ROLLBACK TO s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
+INSERT INTO test_tab VALUES (8, md5(8::text));
 SAVEPOINT s4;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
+INSERT INTO test_tab VALUES (9, md5(9::text));
 SAVEPOINT s5;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
+INSERT INTO test_tab VALUES (10, md5(10::text));
 COMMIT;
 });
 
@@ -75,24 +75,24 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2000|0),
+is($result, qq(6|0),
    'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
 );
 
-# large (streamed) transaction with subscriber receiving out of order
-# subtransaction ROLLBACKs
+# streamed transaction with subscriber receiving out of order subtransaction
+# ROLLBACKs
 $node_publisher->safe_psql(
    'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
+INSERT INTO test_tab VALUES (11, md5(11::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
+INSERT INTO test_tab VALUES (12, md5(12::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
+INSERT INTO test_tab VALUES (13, md5(13::text));
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
+INSERT INTO test_tab VALUES (14, md5(14::text));
 RELEASE s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
+INSERT INTO test_tab VALUES (15, md5(15::text));
 ROLLBACK TO s1;
 COMMIT;
 });
@@ -102,18 +102,18 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2500|0),
+is($result, qq(7|0),
    'check rollback to savepoint was reflected on subscriber');
 
-# large (streamed) transaction with subscriber receiving rollback
+# streamed transaction with subscriber receiving rollback
 $node_publisher->safe_psql(
    'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
+INSERT INTO test_tab VALUES (16, md5(16::text));
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
+INSERT INTO test_tab VALUES (17, md5(17::text));
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
+INSERT INTO test_tab VALUES (18, md5(18::text));
 ROLLBACK;
 });
 
@@ -122,7 +122,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(2500|0), 'check rollback was reflected on subscriber');
+is($result, qq(7|0), 'check rollback was reflected on subscriber');
 
 $node_subscriber->stop;
 $node_publisher->stop;
index 4d7da82b7a8021582b43318a099d8682793913b0..c6719c1af8c0e68df3717e8e1f8edbb10dd8ecda 100644 (file)
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
+# Test streaming of transaction with subtransactions, DDLs, DMLs, and
 # rollbacks
 use strict;
 use warnings;
@@ -13,7 +13,7 @@ use Test::More;
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
-   'logical_decoding_work_mem = 64kB');
+   'logical_decoding_mode = immediate');
 $node_publisher->start;
 
 # Create subscriber node
@@ -49,23 +49,23 @@ my $result =
    "SELECT count(*), count(c) FROM test_tab");
 is($result, qq(2|0), 'check initial data was copied to subscriber');
 
-# large (streamed) transaction with DDL, DML and ROLLBACKs
+# streamed transaction with DDL, DML and ROLLBACKs
 $node_publisher->safe_psql(
    'postgres', q{
 BEGIN;
-INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+INSERT INTO test_tab VALUES (3, md5(3::text));
 ALTER TABLE test_tab ADD COLUMN c INT;
 SAVEPOINT s1;
-INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text), -4);
 ALTER TABLE test_tab ADD COLUMN d INT;
 SAVEPOINT s2;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
+INSERT INTO test_tab VALUES (5, md5(5::text), -5, 5*2);
 ALTER TABLE test_tab ADD COLUMN e INT;
 SAVEPOINT s3;
-INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
+INSERT INTO test_tab VALUES (6, md5(6::text), -6, 6*2, -6*3);
 ALTER TABLE test_tab DROP COLUMN c;
 ROLLBACK TO s1;
-INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
+INSERT INTO test_tab VALUES (4, md5(4::text), 4);
 COMMIT;
 });
 
@@ -74,7 +74,7 @@ $node_publisher->wait_for_catchup($appname);
 $result =
   $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(1000|500),
+is($result, qq(4|1),
    'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
 );
 
index 9b454106bdf3f20a6e77815785017a2ba4967eff..a191129b9dc80b01dc13ec484ee2f4e2af6dae19 100644 (file)
@@ -18,7 +18,7 @@ $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf(
    'postgresql.conf', qq(
 max_prepared_transactions = 10
-logical_decoding_work_mem = 64kB
+logical_decoding_mode = immediate
 ));
 $node_publisher->start;
 
@@ -80,11 +80,11 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber');
 ###############################
 
 # check that 2PC gets replicated to subscriber
-# Insert, update and delete enough rows to exceed the 64kB limit.
+# Insert, update and delete some rows.
 $node_publisher->safe_psql(
    'postgres', q{
    BEGIN;
-   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
    UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
    DELETE FROM test_tab WHERE mod(a,3) = 0;
    PREPARE TRANSACTION 'test_prepared_tab';});
@@ -105,7 +105,7 @@ $node_publisher->wait_for_catchup($appname);
 # check that transaction is committed on subscriber
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
    'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
 );
 $result = $node_subscriber->safe_psql('postgres',
@@ -124,11 +124,11 @@ is($result, qq(0), 'transaction is committed on subscriber');
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
    'postgres', q{
    BEGIN;
-   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
    UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
    DELETE FROM test_tab WHERE mod(a,3) = 0;
    PREPARE TRANSACTION 'test_prepared_tab';});
@@ -158,7 +158,7 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 
 ###############################
 # Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
-# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 1. insert, update and delete some rows.
 # 2. Then server crashes before the 2PC transaction is committed.
 # 3. After servers are restarted the pending transaction is committed.
 #
@@ -169,7 +169,7 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 $node_publisher->safe_psql(
    'postgres', q{
    BEGIN;
-   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
    UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
    DELETE FROM test_tab WHERE mod(a,3) = 0;
    PREPARE TRANSACTION 'test_prepared_tab';});
@@ -188,7 +188,7 @@ $node_publisher->wait_for_catchup($appname);
 # check inserts are visible
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3334|3334|3334),
+is($result, qq(4|4|4),
    'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
 );
 
@@ -206,11 +206,11 @@ is($result, qq(3334|3334|3334),
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
    'postgres', q{
    BEGIN;
-   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
    UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
    DELETE FROM test_tab WHERE mod(a,3) = 0;
    PREPARE TRANSACTION 'test_prepared_tab';});
@@ -257,11 +257,11 @@ is($result, qq(0), 'transaction is aborted on subscriber');
 # First, delete the data except for 2 rows (will be replicated)
 $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
 
-# Then insert, update and delete enough rows to exceed the 64kB limit.
+# Then insert, update and delete some rows.
 $node_publisher->safe_psql(
    'postgres', q{
    BEGIN;
-   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
    UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
    DELETE FROM test_tab WHERE mod(a,3) = 0;
    PREPARE TRANSACTION 'test_prepared_tab';});
@@ -287,7 +287,7 @@ $node_publisher->wait_for_catchup($appname);
 # check that transaction is committed on subscriber
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), count(c), count(d = 999) FROM test_tab");
-is($result, qq(3335|3335|3335),
+is($result, qq(5|5|5),
    'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
 );
 
index 60c71d05fe19efa5fdffa99e32d25124158ab57c..50d86cb01bd1d51c1b9ba2dffd3948c960664672 100644 (file)
@@ -1456,6 +1456,7 @@ LogicalDecodeStreamStopCB
 LogicalDecodeStreamTruncateCB
 LogicalDecodeTruncateCB
 LogicalDecodingContext
+LogicalDecodingMode
 LogicalErrorCallbackState
 LogicalOutputPluginInit
 LogicalOutputPluginWriterPrepareWrite