Optionally disable subscriptions on error.
authorAmit Kapila <akapila@postgresql.org>
Mon, 14 Mar 2022 04:02:40 +0000 (09:32 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 14 Mar 2022 04:02:40 +0000 (09:32 +0530)
Logical replication apply workers for a subscription can easily get stuck
in an infinite loop of attempting to apply a change, triggering an error
(such as a constraint violation), exiting with the error written to the
subscription server log, and restarting.

To partially remedy the situation, this patch adds a new subscription
option named 'disable_on_error'. To be consistent with old behavior, this
option defaults to false. When true, both the tablesync worker and apply
worker catch any errors thrown and disable the subscription in order to
break the loop. The error is still also written in the logs.

Once the subscription is disabled, users can either manually resolve the
conflict/error or skip the conflicting transaction by using
pg_replication_origin_advance() function. After resolving the conflict,
users need to enable the subscription to allow apply process to proceed.

Author: Osumi Takamichi and Mark Dilger
Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu
Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com

17 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/worker.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/029_disable_on_error.pl [new file with mode: 0644]

index 83987a9904578b47eca3ee7ae7d225c0aa999778..7777d605142b67c5d69eda67c92395a8114ded63 100644 (file)
@@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subdisableonerr</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription will be disabled if one of its workers
+       detects an error
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
index 82326c3901934d6d3f3f4aecddf87dfbb7da7f9d..6431d4796db3af5f71d6b0a13aff75ae360d2acc 100644 (file)
@@ -364,8 +364,9 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    the replication origin name can be found from the server log (LSN 0/14C0378 and
    replication origin <literal>pg_16395</literal> in the above case).  To skip the
    transaction, the subscription needs to be disabled temporarily by
-   <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
-   can be skipped by calling the
+   <command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
+   subscription can be used with the <literal>disable_on_error</literal> option.
+   Then, the transaction can be skipped by calling the
    <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
    the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
index 0d6f064f58d74ef0ef31c36782b68ca3bca7bb55..58b78a94eabe29eae586938a50ed2e2fe7e75ce7 100644 (file)
@@ -204,8 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
-      <literal>streaming</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>, and
+      <literal>disable_on_error</literal>.
      </para>
     </listitem>
    </varlistentry>
index e80a2617a34fb7d1216cabc3ebb7dedb156ee815..b701752fc9b2131293546ad7c8f6ff30aaecba52 100644 (file)
@@ -290,6 +290,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
 
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should be automatically disabled
+          if any errors are detected by subscription workers during data
+          replication from the publisher. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
 
index ca65a8bd201a326d40daa5eb60ae5b24f66d83b0..a6304f5f81a22c59f451769bfc09e2167eba4210 100644 (file)
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
    sub->binary = subform->subbinary;
    sub->stream = subform->substream;
    sub->twophasestate = subform->subtwophasestate;
+   sub->disableonerr = subform->subdisableonerr;
 
    /* Get conninfo */
    datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -156,6 +157,45 @@ FreeSubscription(Subscription *sub)
    pfree(sub);
 }
 
+/*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+   Relation    rel;
+   bool        nulls[Natts_pg_subscription];
+   bool        replaces[Natts_pg_subscription];
+   Datum       values[Natts_pg_subscription];
+   HeapTuple   tup;
+
+   /* Look up the subscription in the catalog */
+   rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+   tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+   if (!HeapTupleIsValid(tup))
+       elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+   LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+   /* Form a new tuple. */
+   memset(values, 0, sizeof(values));
+   memset(nulls, false, sizeof(nulls));
+   memset(replaces, false, sizeof(replaces));
+
+   /* Set the subscription to disabled. */
+   values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+   replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+   /* Update the catalog */
+   tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+                           replaces);
+   CatalogTupleUpdate(rel, &tup->t_self, tup);
+   heap_freetuple(tup);
+
+   table_close(rel, NoLock);
+}
+
 /*
  * get_subscription_oid - given a subscription name, look up the OID
  *
index 40b7bca5a96f046b4e32863e455e5efb1d5a007a..bb1ac30cd192623590bc01d292c5f0693421186e 100644 (file)
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subdisableonerr, subslotname,
+              subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
index 3ef6607d2460183053422e5444545dfd9b902c7c..3922658bbcae1730797164319345a2261a5c3c68 100644 (file)
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY              0x00000080
 #define SUBOPT_STREAMING           0x00000100
 #define SUBOPT_TWOPHASE_COMMIT     0x00000200
+#define SUBOPT_DISABLE_ON_ERR      0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
    bool        binary;
    bool        streaming;
    bool        twophase;
+   bool        disableonerr;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
        opts->streaming = false;
    if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
        opts->twophase = false;
+   if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+       opts->disableonerr = false;
 
    /* Parse options */
    foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
            opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
            opts->twophase = defGetBoolean(defel);
        }
+       else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+                strcmp(defel->defname, "disable_on_error") == 0)
+       {
+           if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+               errorConflictingDefElem(defel, pstate);
+
+           opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+           opts->disableonerr = defGetBoolean(defel);
+       }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
    supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
                      SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
                      SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-                     SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+                     SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+                     SUBOPT_DISABLE_ON_ERR);
    parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
    /*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        CharGetDatum(opts.twophase ?
                     LOGICALREP_TWOPHASE_STATE_PENDING :
                     LOGICALREP_TWOPHASE_STATE_DISABLED);
+   values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
    values[Anum_pg_subscription_subconninfo - 1] =
        CStringGetTextDatum(conninfo);
    if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
            {
                supported_opts = (SUBOPT_SLOT_NAME |
                                  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-                                 SUBOPT_STREAMING);
+                                 SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
 
                parse_subscription_options(pstate, stmt->options,
                                           supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                    replaces[Anum_pg_subscription_substream - 1] = true;
                }
 
+               if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+               {
+                   values[Anum_pg_subscription_subdisableonerr - 1]
+                       = BoolGetDatum(opts.disableonerr);
+                   replaces[Anum_pg_subscription_subdisableonerr - 1]
+                       = true;
+               }
+
                update_tuple = true;
                break;
            }
index 8653e1d8402a7f918d74daf154761ab575794281..a1fe81b34f3d8e63eb4a542c3da7183155b10326 100644 (file)
@@ -305,6 +305,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void DisableSubscriptionAndExit(void);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -3374,6 +3376,84 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
    snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
 }
 
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+   char       *syncslotname;
+
+   Assert(am_tablesync_worker());
+
+   PG_TRY();
+   {
+       /* Call initial sync. */
+       syncslotname = LogicalRepSyncTableStart(origin_startpos);
+   }
+   PG_CATCH();
+   {
+       if (MySubscription->disableonerr)
+           DisableSubscriptionAndExit();
+       else
+       {
+           /*
+            * Report the worker failed during table synchronization. Abort
+            * the current transaction so that the stats message is sent in an
+            * idle state.
+            */
+           AbortOutOfAnyTransaction();
+           pgstat_report_subscription_error(MySubscription->oid, false);
+
+           PG_RE_THROW();
+       }
+   }
+   PG_END_TRY();
+
+   /* allocate slot name in long-lived context */
+   *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+   pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ *
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+   PG_TRY();
+   {
+       LogicalRepApplyLoop(origin_startpos);
+   }
+   PG_CATCH();
+   {
+       if (MySubscription->disableonerr)
+           DisableSubscriptionAndExit();
+       else
+       {
+           /*
+            * Report the worker failed while applying changes. Abort the
+            * current transaction so that the stats message is sent in an
+            * idle state.
+            */
+           AbortOutOfAnyTransaction();
+           pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+           PG_RE_THROW();
+       }
+   }
+   PG_END_TRY();
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3381,8 +3461,8 @@ ApplyWorkerMain(Datum main_arg)
    int         worker_slot = DatumGetInt32(main_arg);
    MemoryContext oldctx;
    char        originname[NAMEDATALEN];
-   XLogRecPtr  origin_startpos;
-   char       *myslotname;
+   XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
+   char       *myslotname = NULL;
    WalRcvStreamOptions options;
    int         server_version;
 
@@ -3477,32 +3557,7 @@ ApplyWorkerMain(Datum main_arg)
 
    if (am_tablesync_worker())
    {
-       char       *syncslotname;
-
-       PG_TRY();
-       {
-           /* This is table synchronization worker, call initial sync. */
-           syncslotname = LogicalRepSyncTableStart(&origin_startpos);
-       }
-       PG_CATCH();
-       {
-           /*
-            * Abort the current transaction so that we send the stats message
-            * in an idle state.
-            */
-           AbortOutOfAnyTransaction();
-
-           /* Report the worker failed during table synchronization */
-           pgstat_report_subscription_error(MySubscription->oid, false);
-
-           PG_RE_THROW();
-       }
-       PG_END_TRY();
-
-       /* allocate slot name in long-lived context */
-       myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
-       pfree(syncslotname);
+       start_table_sync(&origin_startpos, &myslotname);
 
        /*
         * Allocate the origin name in long-lived context for error context
@@ -3633,24 +3688,43 @@ ApplyWorkerMain(Datum main_arg)
    }
 
    /* Run the main loop. */
-   PG_TRY();
-   {
-       LogicalRepApplyLoop(origin_startpos);
-   }
-   PG_CATCH();
-   {
-       /*
-        * Abort the current transaction so that we send the stats message in
-        * an idle state.
-        */
-       AbortOutOfAnyTransaction();
+   start_apply(origin_startpos);
 
-       /* Report the worker failed while applying changes */
-       pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+   proc_exit(0);
+}
 
-       PG_RE_THROW();
-   }
-   PG_END_TRY();
+/*
+ * After error recovery, disable the subscription in a new transaction
+ * and exit cleanly.
+ */
+static void
+DisableSubscriptionAndExit(void)
+{
+   /*
+    * Emit the error message, and recover from the error state to an idle
+    * state
+    */
+   HOLD_INTERRUPTS();
+
+   EmitErrorReport();
+   AbortOutOfAnyTransaction();
+   FlushErrorState();
+
+   RESUME_INTERRUPTS();
+
+   /* Report the worker failed during either table synchronization or apply */
+   pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+                                    !am_tablesync_worker());
+
+   /* Disable the subscription */
+   StartTransactionCommand();
+   DisableSubscription(MySubscription->oid);
+   CommitTransactionCommand();
+
+   /* Notify the subscription has been disabled and exit */
+   ereport(LOG,
+           errmsg("logical replication subscription \"%s\" has been disabled due to an error",
+                  MySubscription->name));
 
    proc_exit(0);
 }
index e69dcf8a48416bb1492645a0d4ee9845a4c922d3..4dd24b8c89f273ad851c9575b32fc3d6df4d2f3c 100644 (file)
@@ -4293,6 +4293,7 @@ getSubscriptions(Archive *fout)
    int         i_subowner;
    int         i_substream;
    int         i_subtwophasestate;
+   int         i_subdisableonerr;
    int         i_subconninfo;
    int         i_subslotname;
    int         i_subsynccommit;
@@ -4340,10 +4341,13 @@ getSubscriptions(Archive *fout)
        appendPQExpBufferStr(query, " false AS substream,\n");
 
    if (fout->remoteVersion >= 150000)
-       appendPQExpBufferStr(query, " s.subtwophasestate\n");
+       appendPQExpBufferStr(query,
+                            " s.subtwophasestate,\n"
+                            " s.subdisableonerr\n");
    else
        appendPQExpBuffer(query,
-                         " '%c' AS subtwophasestate\n",
+                         " '%c' AS subtwophasestate,\n"
+                         " false AS subdisableonerr\n",
                          LOGICALREP_TWOPHASE_STATE_DISABLED);
 
    appendPQExpBufferStr(query,
@@ -4366,6 +4370,7 @@ getSubscriptions(Archive *fout)
    i_subbinary = PQfnumber(res, "subbinary");
    i_substream = PQfnumber(res, "substream");
    i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+   i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 
    subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4393,6 +4398,8 @@ getSubscriptions(Archive *fout)
            pg_strdup(PQgetvalue(res, i, i_substream));
        subinfo[i].subtwophasestate =
            pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+       subinfo[i].subdisableonerr =
+           pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 
        /* Decide whether we want to dump it */
        selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4463,6 +4470,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
    if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
        appendPQExpBufferStr(query, ", two_phase = on");
 
+   if (strcmp(subinfo->subdisableonerr, "t") == 0)
+       appendPQExpBufferStr(query, ", disable_on_error = true");
+
    if (strcmp(subinfo->subsynccommit, "off") != 0)
        appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
index 997a3b60719104db184160f9296bc7182f228f57..772dc0cf7a206bdbeaddbbc35596893a1f9ddb88 100644 (file)
@@ -657,6 +657,7 @@ typedef struct _SubscriptionInfo
    char       *subbinary;
    char       *substream;
    char       *subtwophasestate;
+   char       *subdisableonerr;
    char       *subsynccommit;
    char       *subpublications;
 } SubscriptionInfo;
index e3382933d98cac816ae210802ea6d3a8abe710a6..9229eacb6d9a41a780f84f1e95e39ae3834fc364 100644 (file)
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
    static const bool translate_columns[] = {false, false, false, false,
-   false, false, false, false, false};
+   false, false, false, false, false, false};
 
    if (pset.sversion < 100000)
    {
@@ -6118,11 +6118,13 @@ describeSubscriptions(const char *pattern, bool verbose)
                              gettext_noop("Binary"),
                              gettext_noop("Streaming"));
 
-       /* Two_phase is only supported in v15 and higher */
+       /* Two_phase and disable_on_error are only supported in v15 and higher */
        if (pset.sversion >= 150000)
            appendPQExpBuffer(&buf,
-                             ", subtwophasestate AS \"%s\"\n",
-                             gettext_noop("Two phase commit"));
+                             ", subtwophasestate AS \"%s\"\n"
+                             ", subdisableonerr AS \"%s\"\n",
+                             gettext_noop("Two phase commit"),
+                             gettext_noop("Disable on error"));
 
        appendPQExpBuffer(&buf,
                          ",  subsynccommit AS \"%s\"\n"
index 6d5c928c100791e8b7b177bdb59a8c6412a171c7..17172827a96015d01554b605f87b8f0d38a47960 100644 (file)
@@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end)
        COMPLETE_WITH("(", "PUBLICATION");
    /* ALTER SUBSCRIPTION <name> SET ( */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-       COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+       COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
    /* ALTER SUBSCRIPTION <name> SET PUBLICATION */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
    {
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
    else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
        COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
                      "enabled", "slot_name", "streaming",
-                     "synchronous_commit", "two_phase");
+                     "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
index 695990959e5db1bbec5089cdf07d6f71ed0aa0aa..1aa1d41e7959a7834735f1817d9a927fffa0b91f 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202203031
+#define CATALOG_VERSION_NO 202203141
 
 #endif
index 18c291289f634b3cd83390fa4f3c1405a54f6d57..e2befaf351232d1c21f19b21adeacbee3ff377a2 100644 (file)
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
    char        subtwophasestate;   /* Stream two-phase transactions */
 
+   bool        subdisableonerr;    /* True if a worker error should cause the
+                                    * subscription to be disabled */
+
 #ifdef CATALOG_VARLEN          /* variable-length fields start here */
    /* Connection string to the publisher */
    text        subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,9 @@ typedef struct Subscription
                                 * binary format */
    bool        stream;         /* Allow streaming in-progress transactions. */
    char        twophasestate;  /* Allow streaming two-phase transactions */
+   bool        disableonerr;   /* Indicates if the subscription should be
+                                * automatically disabled if a worker error
+                                * occurs */
    char       *conninfo;       /* Connection string to the publisher */
    char       *slotname;       /* Name of the replication slot */
    char       *synccommit;     /* Synchronous commit setting for worker */
@@ -111,6 +117,7 @@ typedef struct Subscription
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
+extern void DisableSubscription(Oid subid);
 extern Oid get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
 
index 80aae83562cb213465b362acda29ceef2be6c59f..ad8003fae12e3c038b41d246d41e23324c4d4abf 100644 (file)
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                     List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,33 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR:  disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
index bd0f4af1e490d9d392f00136b233dd93a6d12790..a7c15b1dafcc2ed77b6e31dc225e557adcc921db 100644 (file)
@@ -228,6 +228,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
new file mode 100644 (file)
index 0000000..5eca804
--- /dev/null
@@ -0,0 +1,94 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical table on both nodes.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+
+# Insert duplicate values on the publisher.
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tbl (i) VALUES (1), (1), (1)");
+
+# Create an additional unique index on the subscriber.
+$node_subscriber->safe_psql('postgres',
+   "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+   "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+   "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Drop the unique index on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+   "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
+is($result, qq(1|3), "subscription sub replicated data");
+
+# Delete the data from the subscriber and recreate the unique index.
+$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
+$node_subscriber->safe_psql('postgres',
+   "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Add more non-unique data to the publisher.
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tbl (i) VALUES (3), (3), (3)");
+
+# Apply failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+   "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscription sub to be disabled";
+
+# Drop the unique index on the subscriber and re-enabled the subscription. Then
+# confirm that the previously failing insert was applied OK.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT COUNT(*) FROM tbl WHERE i = 3");
+is($result, qq(3), 'check the result of apply');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();