Allow users to skip logical replication of data having origin.
authorAmit Kapila <akapila@postgresql.org>
Thu, 21 Jul 2022 03:17:38 +0000 (08:47 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 21 Jul 2022 03:17:38 +0000 (08:47 +0530)
This patch adds a new SUBSCRIPTION parameter "origin". It specifies
whether the subscription will request the publisher to only send changes
that don't have an origin or send changes regardless of origin. Setting it
to "none" means that the subscription will request the publisher to only
send changes that have no origin associated. Setting it to "any" means
that the publisher sends changes regardless of their origin. The default
is "any".
Usage:
CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
PUBLICATION pub1 WITH (origin = none);

This can be used to avoid loops (infinite replication of the same data)
among replication nodes.

This feature allows filtering only the replication data originating from
WAL but for initial sync (initial copy of table data) we don't have such a
facility as we can only distinguish the data based on origin from WAL. As
a follow-up patch, we are planning to forbid the initial sync if the
origin is specified as none and we notice that the publication tables were
also replicated from other publishers to avoid duplicate data or loops.

We forbid to allow creating origin with names 'none' and 'any' to avoid
confusion with the same name options.

Author: Vignesh C, Amit Kapila
Reviewed-By: Peter Smith, Amit Kapila, Dilip Kumar, Shi yu, Ashutosh Bapat, Hayato Kuroda
Discussion: https://postgr.es/m/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com

24 files changed:
contrib/test_decoding/expected/replorigin.out
contrib/test_decoding/sql/replorigin.sql
doc/src/sgml/catalogs.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/origin.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/pg_dump/t/002_pg_dump.pl
src/bin/psql/describe.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/include/replication/pgoutput.h
src/include/replication/walreceiver.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/030_origin.pl [new file with mode: 0644]

index 2e9ef7c823b4ed9e4fb777d05bb2e7c355a7fa2f..49ffaeea2da8bafaceeab811b7ca658779b4ea03 100644 (file)
@@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 ERROR:  replication origin "regress_test_decoding: temp" does not exist
+-- specifying reserved origin names is not supported
+SELECT pg_replication_origin_create('any');
+ERROR:  replication origin name "any" is reserved
+DETAIL:  Origin names "any", "none", and names starting with "pg_" are reserved.
+SELECT pg_replication_origin_create('none');
+ERROR:  replication origin name "none" is reserved
+DETAIL:  Origin names "any", "none", and names starting with "pg_" are reserved.
+SELECT pg_replication_origin_create('pg_replication_origin');
+ERROR:  replication origin name "pg_replication_origin" is reserved
+DETAIL:  Origin names "any", "none", and names starting with "pg_" are reserved.
 -- various failure checks for undefined slots
 select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
 ERROR:  replication origin "regress_test_decoding: temp" does not exist
index 2e28a48777313aa4e35d0aff9490288f044d1d55..db06541f56559d9ee2cccd976fd5fe29e05a550b 100644 (file)
@@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
 
+-- specifying reserved origin names is not supported
+SELECT pg_replication_origin_create('any');
+SELECT pg_replication_origin_create('none');
+SELECT pg_replication_origin_create('pg_replication_origin');
+
 -- various failure checks for undefined slots
 select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
 select pg_replication_origin_session_setup('regress_test_decoding: temp');
index 670a5406d61835469a6c92611b699467fe4bd51e..a186e35f0095a1baf31d776ac1aa4dd331cc39c1 100644 (file)
@@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        see <xref linkend="logical-replication-publication"/>.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>suborigin</structfield> <type>text</type>
+      </para>
+      <para>
+       The origin value must be either <literal>none</literal> or
+       <literal>any</literal>. The default is <literal>any</literal>.
+       If <literal>none</literal>, the subscription will request the publisher
+       to only send changes that don't have an origin. If
+       <literal>any</literal>, the publisher sends changes regardless of their
+       origin.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 353ea5def231ab9f8d10d914b1a0234b308ba858..64efc21f53769dd496155d5931ba3d7a88d65aa8 100644 (file)
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, <literal>streaming</literal>, and
-      <literal>disable_on_error</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>,
+      <literal>disable_on_error</literal>, and
+      <literal>origin</literal>.
      </para>
     </listitem>
    </varlistentry>
index 34b3264b261cdee8ff51f6de30a4b91ccf3fe3a6..7390c715bc30b72cd53cd49008303504e7d2d835 100644 (file)
@@ -302,6 +302,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>origin</literal> (<type>string</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to only
+          send changes that don't have an origin or send changes regardless of
+          origin. Setting <literal>origin</literal> to <literal>none</literal>
+          means that the subscription will request the publisher to only send
+          changes that don't have an origin. Setting <literal>origin</literal>
+          to <literal>any</literal> means that the publisher sends changes
+          regardless of their origin. The default is <literal>any</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
index 8856ce3b5017c76e2ac4112285283658693df693..33ae3da8aeb80709772832a49afcae932b4e70f4 100644 (file)
@@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok)
    Assert(!isnull);
    sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
 
+   /* Get origin */
+   datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+                           tup,
+                           Anum_pg_subscription_suborigin,
+                           &isnull);
+   Assert(!isnull);
+   sub->origin = TextDatumGetCString(datum);
+
    ReleaseSysCache(tup);
 
    return sub;
index fedaed533b910842b07765861f1864145b443f42..f369b1fc141ccbaf20fc3c6ca8426f702b12eca4 100644 (file)
@@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
-              subsynccommit, subpublications)
+              subbinary, substream, subtwophasestate, subdisableonerr,
+              subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
index bdc1208724113196b5c3234140ac7ef283589477..bd0cc0848d70eeffc214ca80d255f64790621c6a 100644 (file)
@@ -64,6 +64,7 @@
 #define SUBOPT_TWOPHASE_COMMIT     0x00000200
 #define SUBOPT_DISABLE_ON_ERR      0x00000400
 #define SUBOPT_LSN                 0x00000800
+#define SUBOPT_ORIGIN              0x00001000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
    bool        streaming;
    bool        twophase;
    bool        disableonerr;
+   char       *origin;
    XLogRecPtr  lsn;
 } SubOpts;
 
@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
           IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
                 SUBOPT_COPY_DATA));
 
-   /* Set default values for the boolean supported options. */
+   /* Set default values for the supported options. */
    if (IsSet(supported_opts, SUBOPT_CONNECT))
        opts->connect = true;
    if (IsSet(supported_opts, SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
        opts->twophase = false;
    if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
        opts->disableonerr = false;
+   if (IsSet(supported_opts, SUBOPT_ORIGIN))
+       opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
    /* Parse options */
    foreach(lc, stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
            opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
            opts->disableonerr = defGetBoolean(defel);
        }
+       else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
+                strcmp(defel->defname, "origin") == 0)
+       {
+           if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
+               errorConflictingDefElem(defel, pstate);
+
+           opts->specified_opts |= SUBOPT_ORIGIN;
+           pfree(opts->origin);
+
+           /*
+            * Even though the "origin" parameter allows only "none" and "any"
+            * values, it is implemented as a string type so that the
+            * parameter can be extended in future versions to support
+            * filtering using origin names specified by the user.
+            */
+           opts->origin = defGetString(defel);
+
+           if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
+               (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
+               ereport(ERROR,
+                       errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                       errmsg("unrecognized origin value: \"%s\"", opts->origin));
+       }
        else if (IsSet(supported_opts, SUBOPT_LSN) &&
                 strcmp(defel->defname, "lsn") == 0)
        {
@@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                      SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
                      SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
                      SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-                     SUBOPT_DISABLE_ON_ERR);
+                     SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
    parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
    /*
@@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        CStringGetTextDatum(opts.synchronous_commit);
    values[Anum_pg_subscription_subpublications - 1] =
        publicationListToArray(publications);
+   values[Anum_pg_subscription_suborigin - 1] =
+       CStringGetTextDatum(opts.origin);
 
    tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
            {
                supported_opts = (SUBOPT_SLOT_NAME |
                                  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-                                 SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+                                 SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+                                 SUBOPT_ORIGIN);
 
                parse_subscription_options(pstate, stmt->options,
                                           supported_opts, &opts);
@@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                        = true;
                }
 
+               if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
+               {
+                   values[Anum_pg_subscription_suborigin - 1] =
+                       CStringGetTextDatum(opts.origin);
+                   replaces[Anum_pg_subscription_suborigin - 1] = true;
+               }
+
                update_tuple = true;
                break;
            }
index 0b775b1e9853e8cb5c6d567ac2c4f8f9007dbd78..da9c359af103e4f07444ce5f12be203aa1141784 100644 (file)
@@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
            PQserverVersion(conn->streamConn) >= 150000)
            appendStringInfoString(&cmd, ", two_phase 'on'");
 
+       if (options->proto.logical.origin &&
+           PQserverVersion(conn->streamConn) >= 160000)
+           appendStringInfo(&cmd, ", origin '%s'",
+                            options->proto.logical.origin);
+
        pubnames = options->proto.logical.publication_names;
        pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
        if (!pubnames_str)
index 21937ab2d3159d8fab6f1e2cd733d3961889f09b..c72ad6b93de8770fbbda36cb37d452101b0a55ac 100644 (file)
@@ -77,6 +77,7 @@
 #include "access/xloginsert.h"
 #include "catalog/catalog.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_subscription.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "nodes/execnodes.h"
@@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
 }
 
 
+/*
+ * IsReservedOriginName
+ *     True iff name is either "none" or "any".
+ */
+static bool
+IsReservedOriginName(const char *name)
+{
+   return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
+           (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
+}
+
 /* ---------------------------------------------------------------------------
  * Functions for working with replication origins themselves.
  * ---------------------------------------------------------------------------
@@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
 
    name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 
-   /* Replication origins "pg_xxx" are reserved for internal use */
-   if (IsReservedName(name))
+   /*
+    * Replication origins "any and "none" are reserved for system options.
+    * The origins "pg_xxx" are reserved for internal use.
+    */
+   if (IsReservedName(name) || IsReservedOriginName(name))
        ereport(ERROR,
                (errcode(ERRCODE_RESERVED_NAME),
                 errmsg("replication origin name \"%s\" is reserved",
                        name),
-                errdetail("Origin names starting with \"pg_\" are reserved.")));
+                errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
+                          LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
 
    /*
     * If built with appropriate switch, whine when regression-testing
index 38e3b1c1b3c0ab646ceecd2fe6652975538ea350..5f8c5417630a5344b93211cdc75a89041a65d2e0 100644 (file)
@@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
        strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
        newsub->binary != MySubscription->binary ||
        newsub->stream != MySubscription->stream ||
+       strcmp(newsub->origin, MySubscription->origin) != 0 ||
        newsub->owner != MySubscription->owner ||
        !equal(newsub->publications, MySubscription->publications))
    {
@@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
    options.proto.logical.binary = MySubscription->binary;
    options.proto.logical.streaming = MySubscription->stream;
    options.proto.logical.twophase = false;
+   options.proto.logical.origin = pstrdup(MySubscription->origin);
 
    if (!am_tablesync_worker())
    {
index ba8a24d0999fa598351b4321e037d528203c862e..a3c1ba8a4021cd687c03eb071ea1ce4fd74f9386 100644 (file)
@@ -16,6 +16,7 @@
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_subscription.h"
 #include "commands/defrem.h"
 #include "executor/executor.h"
 #include "fmgr.h"
@@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 static bool in_streaming;
+static bool publish_no_origin;
 
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data)
    bool        messages_option_given = false;
    bool        streaming_given = false;
    bool        two_phase_option_given = false;
+   bool        origin_option_given = false;
 
    data->binary = false;
    data->streaming = false;
@@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data)
 
            data->two_phase = defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "origin") == 0)
+       {
+           if (origin_option_given)
+               ereport(ERROR,
+                       errcode(ERRCODE_SYNTAX_ERROR),
+                       errmsg("conflicting or redundant options"));
+           origin_option_given = true;
+
+           data->origin = defGetString(defel);
+           if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
+               publish_no_origin = true;
+           else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
+               publish_no_origin = false;
+           else
+               ereport(ERROR,
+                       errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                       errmsg("unrecognized origin value: \"%s\"", data->origin));
+       }
        else
            elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
    }
@@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
- * Currently we always forward.
+ * Return true if the data is associated with an origin and the user has
+ * requested the changes that don't have an origin, false otherwise.
  */
 static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
                       RepOriginId origin_id)
 {
+   if (publish_no_origin && origin_id != InvalidRepOriginId)
+       return true;
+
    return false;
 }
 
index e4fdb6b75b067e1c76323110795db884ed345cd9..f9c51d1e679c55c169900b16f12aef0d5b089e9c 100644 (file)
@@ -4412,6 +4412,7 @@ getSubscriptions(Archive *fout)
    int         i_substream;
    int         i_subtwophasestate;
    int         i_subdisableonerr;
+   int         i_suborigin;
    int         i_subconninfo;
    int         i_subslotname;
    int         i_subsynccommit;
@@ -4461,13 +4462,18 @@ getSubscriptions(Archive *fout)
    if (fout->remoteVersion >= 150000)
        appendPQExpBufferStr(query,
                             " s.subtwophasestate,\n"
-                            " s.subdisableonerr\n");
+                            " s.subdisableonerr,\n");
    else
        appendPQExpBuffer(query,
                          " '%c' AS subtwophasestate,\n"
-                         " false AS subdisableonerr\n",
+                         " false AS subdisableonerr,\n",
                          LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+   if (fout->remoteVersion >= 160000)
+       appendPQExpBufferStr(query, " s.suborigin\n");
+   else
+       appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+
    appendPQExpBufferStr(query,
                         "FROM pg_subscription s\n"
                         "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4493,6 +4499,7 @@ getSubscriptions(Archive *fout)
    i_substream = PQfnumber(res, "substream");
    i_subtwophasestate = PQfnumber(res, "subtwophasestate");
    i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+   i_suborigin = PQfnumber(res, "suborigin");
 
    subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4522,6 +4529,7 @@ getSubscriptions(Archive *fout)
            pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
        subinfo[i].subdisableonerr =
            pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+       subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 
        /* Decide whether we want to dump it */
        selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4595,6 +4603,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
    if (strcmp(subinfo->subdisableonerr, "t") == 0)
        appendPQExpBufferStr(query, ", disable_on_error = true");
 
+   if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
+       appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
+
    if (strcmp(subinfo->subsynccommit, "off") != 0)
        appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
index 1d21c2906f114a7b7f5f6b0ef64328a12b97ed96..69ee939d4497528ff05b69b1af9c550a368af876 100644 (file)
@@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
    char       *substream;
    char       *subtwophasestate;
    char       *subdisableonerr;
+   char       *suborigin;
    char       *subsynccommit;
    char       *subpublications;
 } SubscriptionInfo;
index 1f08716f690a14e1caa3b642759e8ecd629ffa69..b10e1c4c0d4d27bf1a427c8cfe36f8dea4dad78a 100644 (file)
@@ -2465,6 +2465,28 @@ my %tests = (
        like => { %full_runs, section_post_data => 1, },
    },
 
+   'CREATE SUBSCRIPTION sub2' => {
+       create_order => 50,
+       create_sql   => 'CREATE SUBSCRIPTION sub2
+                        CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+                        WITH (connect = false, origin = none);',
+       regexp => qr/^
+           \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+           /xm,
+       like => { %full_runs, section_post_data => 1, },
+   },
+
+   'CREATE SUBSCRIPTION sub3' => {
+       create_order => 50,
+       create_sql   => 'CREATE SUBSCRIPTION sub3
+                        CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+                        WITH (connect = false, origin = any);',
+       regexp => qr/^
+           \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+           /xm,
+       like => { %full_runs, section_post_data => 1, },
+   },
+
    'ALTER PUBLICATION pub1 ADD TABLE test_table' => {
        create_order => 51,
        create_sql =>
index 1938e1d6ec8d4da097b056be5b28f9fcde553620..327a69487bb9d22adeee4c87fa3842fc19b0cd03 100644 (file)
@@ -6469,7 +6469,7 @@ describeSubscriptions(const char *pattern, bool verbose)
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
    static const bool translate_columns[] = {false, false, false, false,
-   false, false, false, false, false, false, false};
+   false, false, false, false, false, false, false, false};
 
    if (pset.sversion < 100000)
    {
@@ -6511,6 +6511,11 @@ describeSubscriptions(const char *pattern, bool verbose)
                              gettext_noop("Two-phase commit"),
                              gettext_noop("Disable on error"));
 
+       if (pset.sversion >= 160000)
+           appendPQExpBuffer(&buf,
+                             ", suborigin AS \"%s\"\n",
+                             gettext_noop("Origin"));
+
        appendPQExpBuffer(&buf,
                          ",  subsynccommit AS \"%s\"\n"
                          ",  subconninfo AS \"%s\"\n",
index e572f585ef8312ebab48948bffc3d3699faf1db1..92207d2e160aa00444567e4f2cf2fc428277999c 100644 (file)
@@ -1873,7 +1873,8 @@ psql_completion(const char *text, int start, int end)
        COMPLETE_WITH("(", "PUBLICATION");
    /* ALTER SUBSCRIPTION <name> SET ( */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-       COMPLETE_WITH("binary", "disable_on_error", "slot_name", "streaming", "synchronous_commit");
+       COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+                     "streaming", "synchronous_commit");
    /* ALTER SUBSCRIPTION <name> SKIP ( */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
        COMPLETE_WITH("lsn");
@@ -3152,8 +3153,8 @@ psql_completion(const char *text, int start, int end)
    /* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
    else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
        COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-                     "disable_on_error", "enabled", "slot_name", "streaming",
-                     "synchronous_commit", "two_phase");
+                     "disable_on_error", "enabled", "origin", "slot_name",
+                     "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
index c27fe0fcd8172cbcb2b2379f1a403428bd30543e..90379e05cbc7416f74449eadac4477366bc9245e 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202207201
+#define CATALOG_VERSION_NO 202207211
 
 #endif
index d1260f590cf51f91bc7e3dcf462a6ec76cde2978..c9a3026b2832032f5ccd5ad7be4b786b57050674 100644 (file)
 #define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
 #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
 
+/*
+ * The subscription will request the publisher to only send changes that do not
+ * have any origin.
+ */
+#define LOGICALREP_ORIGIN_NONE "none"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
 /* ----------------
  *     pg_subscription definition. cpp turns this into
  *     typedef struct FormData_pg_subscription
@@ -87,6 +99,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
    /* List of publications subscribed to */
    text        subpublications[1] BKI_FORCE_NOT_NULL;
+
+   /* Only publish data originating from the specified origin */
+   text        suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
 #endif
 } FormData_pg_subscription;
 
@@ -118,6 +133,8 @@ typedef struct Subscription
    char       *slotname;       /* Name of the replication slot */
    char       *synccommit;     /* Synchronous commit setting for worker */
    List       *publications;   /* List of publication names to subscribe to */
+   char       *origin;         /* Only publish data originating from the
+                                * specified origin */
 } Subscription;
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
index eafedd610a50b2096fd1a74e18317cb3b6a9880c..02027550e25a5cbbd73b82b6eea0017c2e23bc6f 100644 (file)
@@ -29,6 +29,7 @@ typedef struct PGOutputData
    bool        streaming;
    bool        messages;
    bool        two_phase;
+   char       *origin;
 } PGOutputData;
 
 #endif                         /* PGOUTPUT_H */
index 81184aa92f3b51ae72c9b0f4bac8814ece3f20a7..88d7cc6abcbd9b1c190252cdd8a95355b3448d7e 100644 (file)
@@ -183,6 +183,8 @@ typedef struct
            bool        streaming;  /* Streaming of large transactions */
            bool        twophase;   /* Streaming of two-phase transactions at
                                     * prepare time */
+           char       *origin; /* Only publish data originating from the
+                                * specified origin */
        }           logical;
    }           proto;
 } WalRcvStreamOptions;
index 5db7146e0617f19633431a14f80d53b9cf49eb09..ef0ebf96b9017a2b134c7f230a53238348a35311 100644 (file)
@@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- fail - origin must be either none or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
+ERROR:  unrecognized origin value: "foo"
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+ regress_testsub4
+                                                                                         List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+                                                                                         List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -96,10 +118,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                             List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                             List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                           List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                               List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -179,19 +201,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -202,19 +224,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                            List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -247,10 +269,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -284,10 +306,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -296,10 +318,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -323,18 +345,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
index 74c38ead5d6d729a2ec3814631f2b44d870d6ef6..4425fafc46c2f1718666e77f883aee5a0a68504b 100644 (file)
@@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- fail - origin must be either none or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
+\dRs+ regress_testsub4
+ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
new file mode 100644 (file)
index 0000000..e9241d2
--- /dev/null
@@ -0,0 +1,155 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test the CREATE SUBSCRIPTION 'origin' parameter.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################################################################
+# Setup a bidirectional logical replication between node_A & node_B
+###############################################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->start;
+
+# Create table on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Create the same table on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
+my $appname_B1 = 'tap_sub_B1';
+$node_B->safe_psql(
+   'postgres', "
+   CREATE SUBSCRIPTION tap_sub_B1
+   CONNECTION '$node_A_connstr application_name=$appname_B1'
+   PUBLICATION tap_pub_A
+   WITH (origin = none)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql(
+   'postgres', "
+   CREATE SUBSCRIPTION tap_sub_A
+   CONNECTION '$node_B_connstr application_name=$appname_A'
+   PUBLICATION tap_pub_B
+   WITH (origin = none, copy_data = off)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1, 1, 'Bidirectional replication setup is complete');
+
+my $result;
+
+###############################################################################
+# Check that bidirectional logical replication setup does not cause infinite
+# recursive insertion.
+###############################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is( $result, qq(11
+21),
+   'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is( $result, qq(11
+21),
+   'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+
+$node_A->safe_psql('postgres', "DELETE FROM tab;");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+###############################################################################
+# Check that remote data of node_B (that originated from node_C) is not
+# published to node_A.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+# Initialize node node_C
+my $node_C = PostgreSQL::Test::Cluster->new('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->start;
+
+$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_C (pub) -> node_B (sub)
+my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
+$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
+
+my $appname_B2 = 'tap_sub_B2';
+$node_B->safe_psql(
+   'postgres', "
+   CREATE SUBSCRIPTION tap_sub_B2
+   CONNECTION '$node_C_connstr application_name=$appname_B2'
+   PUBLICATION tap_pub_C
+   WITH (origin = none)");
+
+$node_C->wait_for_catchup($appname_B2);
+
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert a record
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+
+$node_C->wait_for_catchup($appname_B2);
+$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($appname_B1);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(32), 'The node_C data replicated to node_B');
+
+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(),
+   'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
+);
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+$node_C->stop('fast');
+
+done_testing();