SELECT pg_replication_origin_drop('regress_test_decoding: temp');
ERROR: replication origin "regress_test_decoding: temp" does not exist
+-- specifying reserved origin names is not supported
+SELECT pg_replication_origin_create('any');
+ERROR: replication origin name "any" is reserved
+DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
+SELECT pg_replication_origin_create('none');
+ERROR: replication origin name "none" is reserved
+DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
+SELECT pg_replication_origin_create('pg_replication_origin');
+ERROR: replication origin name "pg_replication_origin" is reserved
+DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
-- various failure checks for undefined slots
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
ERROR: replication origin "regress_test_decoding: temp" does not exist
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
+-- specifying reserved origin names is not supported
+SELECT pg_replication_origin_create('any');
+SELECT pg_replication_origin_create('none');
+SELECT pg_replication_origin_create('pg_replication_origin');
+
-- various failure checks for undefined slots
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
select pg_replication_origin_session_setup('regress_test_decoding: temp');
see <xref linkend="logical-replication-publication"/>.
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>suborigin</structfield> <type>text</type>
+ </para>
+ <para>
+ The origin value must be either <literal>none</literal> or
+ <literal>any</literal>. The default is <literal>any</literal>.
+ If <literal>none</literal>, the subscription will request the publisher
+ to only send changes that don't have an origin. If
+ <literal>any</literal>, the publisher sends changes regardless of their
+ origin.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>,
- <literal>binary</literal>, <literal>streaming</literal>, and
- <literal>disable_on_error</literal>.
+ <literal>binary</literal>, <literal>streaming</literal>,
+ <literal>disable_on_error</literal>, and
+ <literal>origin</literal>.
</para>
</listitem>
</varlistentry>
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>origin</literal> (<type>string</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether the subscription will request the publisher to only
+ send changes that don't have an origin or send changes regardless of
+ origin. Setting <literal>origin</literal> to <literal>none</literal>
+ means that the subscription will request the publisher to only send
+ changes that don't have an origin. Setting <literal>origin</literal>
+ to <literal>any</literal> means that the publisher sends changes
+ regardless of their origin. The default is <literal>any</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist></para>
</listitem>
Assert(!isnull);
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
+ /* Get origin */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_suborigin,
+ &isnull);
+ Assert(!isnull);
+ sub->origin = TextDatumGetCString(datum);
+
ReleaseSysCache(tup);
return sub;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
- subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
- subsynccommit, subpublications)
+ subbinary, substream, subtwophasestate, subdisableonerr,
+ subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_LSN 0x00000800
+#define SUBOPT_ORIGIN 0x00001000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
bool streaming;
bool twophase;
bool disableonerr;
+ char *origin;
XLogRecPtr lsn;
} SubOpts;
IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_COPY_DATA));
- /* Set default values for the boolean supported options. */
+ /* Set default values for the supported options. */
if (IsSet(supported_opts, SUBOPT_CONNECT))
opts->connect = true;
if (IsSet(supported_opts, SUBOPT_ENABLED))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
opts->disableonerr = false;
+ if (IsSet(supported_opts, SUBOPT_ORIGIN))
+ opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
/* Parse options */
foreach(lc, stmt_options)
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
+ strcmp(defel->defname, "origin") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_ORIGIN;
+ pfree(opts->origin);
+
+ /*
+ * Even though the "origin" parameter allows only "none" and "any"
+ * values, it is implemented as a string type so that the
+ * parameter can be extended in future versions to support
+ * filtering using origin names specified by the user.
+ */
+ opts->origin = defGetString(defel);
+
+ if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
+ (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized origin value: \"%s\"", opts->origin));
+ }
else if (IsSet(supported_opts, SUBOPT_LSN) &&
strcmp(defel->defname, "lsn") == 0)
{
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
- SUBOPT_DISABLE_ON_ERR);
+ SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
+ values[Anum_pg_subscription_suborigin - 1] =
+ CStringGetTextDatum(opts.origin);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
= true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
+ {
+ values[Anum_pg_subscription_suborigin - 1] =
+ CStringGetTextDatum(opts.origin);
+ replaces[Anum_pg_subscription_suborigin - 1] = true;
+ }
+
update_tuple = true;
break;
}
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
+ if (options->proto.logical.origin &&
+ PQserverVersion(conn->streamConn) >= 160000)
+ appendStringInfo(&cmd, ", origin '%s'",
+ options->proto.logical.origin);
+
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
+#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
}
+/*
+ * IsReservedOriginName
+ * True iff name is either "none" or "any".
+ */
+static bool
+IsReservedOriginName(const char *name)
+{
+ return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
+ (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
+}
+
/* ---------------------------------------------------------------------------
* Functions for working with replication origins themselves.
* ---------------------------------------------------------------------------
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
- /* Replication origins "pg_xxx" are reserved for internal use */
- if (IsReservedName(name))
+ /*
+ * Replication origins "any and "none" are reserved for system options.
+ * The origins "pg_xxx" are reserved for internal use.
+ */
+ if (IsReservedName(name) || IsReservedOriginName(name))
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("replication origin name \"%s\" is reserved",
name),
- errdetail("Origin names starting with \"pg_\" are reserved.")));
+ errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
+ LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
/*
* If built with appropriate switch, whine when regression-testing
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
+ strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
options.proto.logical.twophase = false;
+ options.proto.logical.origin = pstrdup(MySubscription->origin);
if (!am_tablesync_worker())
{
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
+#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "executor/executor.h"
#include "fmgr.h"
static bool publications_valid;
static bool in_streaming;
+static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
bool messages_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
+ bool origin_option_given = false;
data->binary = false;
data->streaming = false;
data->two_phase = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "origin") == 0)
+ {
+ if (origin_option_given)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"));
+ origin_option_given = true;
+
+ data->origin = defGetString(defel);
+ if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
+ publish_no_origin = true;
+ else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
+ publish_no_origin = false;
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized origin value: \"%s\"", data->origin));
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
}
/*
- * Currently we always forward.
+ * Return true if the data is associated with an origin and the user has
+ * requested the changes that don't have an origin, false otherwise.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
+ if (publish_no_origin && origin_id != InvalidRepOriginId)
+ return true;
+
return false;
}
int i_substream;
int i_subtwophasestate;
int i_subdisableonerr;
+ int i_suborigin;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
" s.subtwophasestate,\n"
- " s.subdisableonerr\n");
+ " s.subdisableonerr,\n");
else
appendPQExpBuffer(query,
" '%c' AS subtwophasestate,\n"
- " false AS subdisableonerr\n",
+ " false AS subdisableonerr,\n",
LOGICALREP_TWOPHASE_STATE_DISABLED);
+ if (fout->remoteVersion >= 160000)
+ appendPQExpBufferStr(query, " s.suborigin\n");
+ else
+ appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+
appendPQExpBufferStr(query,
"FROM pg_subscription s\n"
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
i_substream = PQfnumber(res, "substream");
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+ i_suborigin = PQfnumber(res, "suborigin");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
subinfo[i].subdisableonerr =
pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+ subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
if (strcmp(subinfo->subdisableonerr, "t") == 0)
appendPQExpBufferStr(query, ", disable_on_error = true");
+ if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
+ appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
char *substream;
char *subtwophasestate;
char *subdisableonerr;
+ char *suborigin;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
like => { %full_runs, section_post_data => 1, },
},
+ 'CREATE SUBSCRIPTION sub2' => {
+ create_order => 50,
+ create_sql => 'CREATE SUBSCRIPTION sub2
+ CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+ WITH (connect = false, origin = none);',
+ regexp => qr/^
+ \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+ /xm,
+ like => { %full_runs, section_post_data => 1, },
+ },
+
+ 'CREATE SUBSCRIPTION sub3' => {
+ create_order => 50,
+ create_sql => 'CREATE SUBSCRIPTION sub3
+ CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
+ WITH (connect = false, origin = any);',
+ regexp => qr/^
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+ /xm,
+ like => { %full_runs, section_post_data => 1, },
+ },
+
'ALTER PUBLICATION pub1 ADD TABLE test_table' => {
create_order => 51,
create_sql =>
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false, false, false, false};
+ false, false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
gettext_noop("Two-phase commit"),
gettext_noop("Disable on error"));
+ if (pset.sversion >= 160000)
+ appendPQExpBuffer(&buf,
+ ", suborigin AS \"%s\"\n",
+ gettext_noop("Origin"));
+
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
- COMPLETE_WITH("binary", "disable_on_error", "slot_name", "streaming", "synchronous_commit");
+ COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+ "streaming", "synchronous_commit");
/* ALTER SUBSCRIPTION <name> SKIP ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
COMPLETE_WITH("lsn");
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
- "disable_on_error", "enabled", "slot_name", "streaming",
- "synchronous_commit", "two_phase");
+ "disable_on_error", "enabled", "origin", "slot_name",
+ "streaming", "synchronous_commit", "two_phase");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202207201
+#define CATALOG_VERSION_NO 202207211
#endif
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+/*
+ * The subscription will request the publisher to only send changes that do not
+ * have any origin.
+ */
+#define LOGICALREP_ORIGIN_NONE "none"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
/* ----------------
* pg_subscription definition. cpp turns this into
* typedef struct FormData_pg_subscription
/* List of publications subscribed to */
text subpublications[1] BKI_FORCE_NOT_NULL;
+
+ /* Only publish data originating from the specified origin */
+ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
#endif
} FormData_pg_subscription;
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
+ char *origin; /* Only publish data originating from the
+ * specified origin */
} Subscription;
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
bool streaming;
bool messages;
bool two_phase;
+ char *origin;
} PGOutputData;
#endif /* PGOUTPUT_H */
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
+ char *origin; /* Only publish data originating from the
+ * specified origin */
} logical;
} proto;
} WalRcvStreamOptions;
ERROR: cannot enable subscription that does not have a slot name
ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- fail - origin must be either none or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
+ERROR: unrecognized origin value: "foo"
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+ regress_testsub4
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | none | off | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
+(1 row)
+
DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
-- fail - invalid connection string
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
ALTER SUBSCRIPTION regress_testsub3 ENABLE;
ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
+-- fail - origin must be either none or any
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
+\dRs+ regress_testsub4
+ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
+\dRs+ regress_testsub4
+
DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
-- fail - invalid connection string
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
--- /dev/null
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test the CREATE SUBSCRIPTION 'origin' parameter.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################################################################
+# Setup a bidirectional logical replication between node_A & node_B
+###############################################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->start;
+
+# Create table on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Create the same table on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
+my $appname_B1 = 'tap_sub_B1';
+$node_B->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub_B1
+ CONNECTION '$node_A_connstr application_name=$appname_B1'
+ PUBLICATION tap_pub_A
+ WITH (origin = none)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub_A
+ CONNECTION '$node_B_connstr application_name=$appname_A'
+ PUBLICATION tap_pub_B
+ WITH (origin = none, copy_data = off)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1, 1, 'Bidirectional replication setup is complete');
+
+my $result;
+
+###############################################################################
+# Check that bidirectional logical replication setup does not cause infinite
+# recursive insertion.
+###############################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is( $result, qq(11
+21),
+ 'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is( $result, qq(11
+21),
+ 'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+
+$node_A->safe_psql('postgres', "DELETE FROM tab;");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+###############################################################################
+# Check that remote data of node_B (that originated from node_C) is not
+# published to node_A.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+# Initialize node node_C
+my $node_C = PostgreSQL::Test::Cluster->new('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->start;
+
+$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_C (pub) -> node_B (sub)
+my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
+$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
+
+my $appname_B2 = 'tap_sub_B2';
+$node_B->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub_B2
+ CONNECTION '$node_C_connstr application_name=$appname_B2'
+ PUBLICATION tap_pub_C
+ WITH (origin = none)");
+
+$node_C->wait_for_catchup($appname_B2);
+
+$node_B->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert a record
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+
+$node_C->wait_for_catchup($appname_B2);
+$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($appname_B1);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(32), 'The node_C data replicated to node_B');
+
+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(),
+ 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
+);
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+$node_C->stop('fast');
+
+done_testing();