</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>subdisableonerr</structfield> <type>bool</type>
+ </para>
+ <para>
+ If true, the subscription will be disabled if one of its workers
+ detects an error
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
the replication origin name can be found from the server log (LSN 0/14C0378 and
replication origin <literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
- <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
- can be skipped by calling the
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
+ subscription can be used with the <literal>disable_on_error</literal> option.
+ Then, the transaction can be skipped by calling the
<link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>,
- <literal>binary</literal>, and
- <literal>streaming</literal>.
+ <literal>binary</literal>, <literal>streaming</literal>, and
+ <literal>disable_on_error</literal>.
</para>
</listitem>
</varlistentry>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether the subscription should be automatically disabled
+ if any errors are detected by subscription workers during data
+ replication from the publisher. The default is
+ <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</para>
sub->binary = subform->subbinary;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
+ sub->disableonerr = subform->subdisableonerr;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
pfree(sub);
}
+/*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+ Relation rel;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+ HeapTuple tup;
+
+ /* Look up the subscription in the catalog */
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* Set the subscription to disabled. */
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+ replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+ /* Update the catalog */
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+ heap_freetuple(tup);
+
+ table_close(rel, NoLock);
+}
+
/*
* get_subscription_oid - given a subscription name, look up the OID
*
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+ substream, subtwophasestate, subdisableonerr, subslotname,
+ subsynccommit, subpublications)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
+#define SUBOPT_DISABLE_ON_ERR 0x00000400
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
bool binary;
bool streaming;
bool twophase;
+ bool disableonerr;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
opts->streaming = false;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
+ if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+ opts->disableonerr = false;
/* Parse options */
foreach(lc, stmt_options)
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+ strcmp(defel->defname, "disable_on_error") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+ opts->disableonerr = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
+ values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
replaces[Anum_pg_subscription_substream - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+ {
+ values[Anum_pg_subscription_subdisableonerr - 1]
+ = BoolGetDatum(opts.disableonerr);
+ replaces[Anum_pg_subscription_subdisableonerr - 1]
+ = true;
+ }
+
update_tuple = true;
break;
}
static void maybe_reread_subscription(void);
+static void DisableSubscriptionAndExit(void);
+
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+ char *syncslotname;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+ pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ *
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
WalRcvStreamOptions options;
int server_version;
if (am_tablesync_worker())
{
- char *syncslotname;
-
- PG_TRY();
- {
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message
- * in an idle state.
- */
- AbortOutOfAnyTransaction();
-
- /* Report the worker failed during table synchronization */
- pgstat_report_subscription_error(MySubscription->oid, false);
-
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
- pfree(syncslotname);
+ start_table_sync(&origin_startpos, &myslotname);
/*
* Allocate the origin name in long-lived context for error context
}
/* Run the main loop. */
- PG_TRY();
- {
- LogicalRepApplyLoop(origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message in
- * an idle state.
- */
- AbortOutOfAnyTransaction();
+ start_apply(origin_startpos);
- /* Report the worker failed while applying changes */
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ proc_exit(0);
+}
- PG_RE_THROW();
- }
- PG_END_TRY();
+/*
+ * After error recovery, disable the subscription in a new transaction
+ * and exit cleanly.
+ */
+static void
+DisableSubscriptionAndExit(void)
+{
+ /*
+ * Emit the error message, and recover from the error state to an idle
+ * state
+ */
+ HOLD_INTERRUPTS();
+
+ EmitErrorReport();
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ /* Report the worker failed during either table synchronization or apply */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+
+ /* Disable the subscription */
+ StartTransactionCommand();
+ DisableSubscription(MySubscription->oid);
+ CommitTransactionCommand();
+
+ /* Notify the subscription has been disabled and exit */
+ ereport(LOG,
+ errmsg("logical replication subscription \"%s\" has been disabled due to an error",
+ MySubscription->name));
proc_exit(0);
}
int i_subowner;
int i_substream;
int i_subtwophasestate;
+ int i_subdisableonerr;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
appendPQExpBufferStr(query, " false AS substream,\n");
if (fout->remoteVersion >= 150000)
- appendPQExpBufferStr(query, " s.subtwophasestate\n");
+ appendPQExpBufferStr(query,
+ " s.subtwophasestate,\n"
+ " s.subdisableonerr\n");
else
appendPQExpBuffer(query,
- " '%c' AS subtwophasestate\n",
+ " '%c' AS subtwophasestate,\n"
+ " false AS subdisableonerr\n",
LOGICALREP_TWOPHASE_STATE_DISABLED);
appendPQExpBufferStr(query,
i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+ i_subdisableonerr = PQfnumber(res, "subdisableonerr");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
pg_strdup(PQgetvalue(res, i, i_substream));
subinfo[i].subtwophasestate =
pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+ subinfo[i].subdisableonerr =
+ pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
+ if (strcmp(subinfo->subdisableonerr, "t") == 0)
+ appendPQExpBufferStr(query, ", disable_on_error = true");
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
char *subbinary;
char *substream;
char *subtwophasestate;
+ char *subdisableonerr;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false, false};
+ false, false, false, false, false, false};
if (pset.sversion < 100000)
{
gettext_noop("Binary"),
gettext_noop("Streaming"));
- /* Two_phase is only supported in v15 and higher */
+ /* Two_phase and disable_on_error are only supported in v15 and higher */
if (pset.sversion >= 150000)
appendPQExpBuffer(&buf,
- ", subtwophasestate AS \"%s\"\n",
- gettext_noop("Two phase commit"));
+ ", subtwophasestate AS \"%s\"\n"
+ ", subdisableonerr AS \"%s\"\n",
+ gettext_noop("Two phase commit"),
+ gettext_noop("Disable on error"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
- COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+ COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
{
else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"enabled", "slot_name", "streaming",
- "synchronous_commit", "two_phase");
+ "synchronous_commit", "two_phase", "disable_on_error");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202203031
+#define CATALOG_VERSION_NO 202203141
#endif
char subtwophasestate; /* Stream two-phase transactions */
+ bool subdisableonerr; /* True if a worker error should cause the
+ * subscription to be disabled */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
char twophasestate; /* Allow streaming two-phase transactions */
+ bool disableonerr; /* Indicates if the subscription should be
+ * automatically disabled if a worker error
+ * occurs */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
+extern void DisableSubscription(Oid subid);
extern Oid get_subscription_oid(const char *subname, bool missing_ok);
extern char *get_subscription_name(Oid subid, bool missing_ok);
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication already exists
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication used more then once
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist
(1 row)
--fail - alter of two_phase option not supported.
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR: disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
--- /dev/null
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical table on both nodes.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+
+# Insert duplicate values on the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (1), (1), (1)");
+
+# Create an additional unique index on the subscriber.
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Drop the unique index on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
+is($result, qq(1|3), "subscription sub replicated data");
+
+# Delete the data from the subscriber and recreate the unique index.
+$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Add more non-unique data to the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (3), (3), (3)");
+
+# Apply failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscription sub to be disabled";
+
+# Drop the unique index on the subscriber and re-enabled the subscription. Then
+# confirm that the previously failing insert was applied OK.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT COUNT(*) FROM tbl WHERE i = 3");
+is($result, qq(3), 'check the result of apply');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();