</para>
<para>
This is an array of <structfield>indnatts</structfield> values that
- indicate which table columns this index indexes. For example a value
+ indicate which table columns this index indexes. For example, a value
of <literal>1 3</literal> would mean that the first and the third table
columns make up the index entries. Key columns come before non-key
(included) columns. A zero in this array indicates that the
Reference to schema
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>prattrs</structfield> <type>int2vector</type>
+ (references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attnum</structfield>)
+ </para>
+ <para>
+ This is an array of values that indicates which table columns are
+ part of the publication. For example, a value of <literal>1 3</literal>
+ would mean that the first and the third table columns are published.
+ A null value indicates that all columns are published.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
</listitem>
</varlistentry>
</variablelist>
- Next, the following message part appears for each column (except generated columns):
+ Next, the following message part appears for each column included in
+ the publication (except generated columns):
<variablelist>
<varlistentry>
<term>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
+ </para>
+
+ <para>
+ Optionally, a column list can be specified. See <xref
+ linkend="sql-createpublication"/> for details.
+ </para>
+
+ <para>
If the optional <literal>WHERE</literal> clause is specified, rows for
which the <replaceable class="parameter">expression</replaceable>
evaluates to false or null will not be published. Note that parentheses
<para>
Add some tables to the publication:
<programlisting>
-ALTER PUBLICATION mypublication ADD TABLE users, departments;
+ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
+</programlisting></para>
+
+ <para>
+ Change the set of columns published for a table:
+<programlisting>
+ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments;
</programlisting></para>
<para>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
<literal>TRUNCATE</literal> commands.
</para>
+ <para>
+ When a column list is specified, only the named columns are replicated.
+ If no column list is specified, all columns of the table are replicated
+ through this publication, including any columns added later. If a column
+ list is specified, it must include the replica identity columns.
+ </para>
+
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
<structname>sales</structname>:
<programlisting>
CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales;
+</programlisting></para>
+
+ <para>
+ Create a publication that publishes all changes for table <structname>users</structname>,
+ but replicates only columns <structname>user_id</structname> and
+ <structname>firstname</structname>:
+<programlisting>
+CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname);
</programlisting></para>
</refsect1>
#include "utils/rel.h"
#include "utils/syscache.h"
+static void publication_translate_columns(Relation targetrel, List *columns,
+ int *natts, AttrNumber **attrs);
+
/*
* Check if relation can be in given publication and throws appropriate
* error if not.
Oid relid = RelationGetRelid(targetrel);
Oid pubreloid;
Publication *pub = GetPublication(pubid);
+ AttrNumber *attarray = NULL;
+ int natts = 0;
ObjectAddress myself,
referenced;
List *relids = NIL;
check_publication_add_relation(targetrel);
+ /*
+ * Translate column names to attnums and make sure the column list contains
+ * only allowed elements (no system or generated columns etc.). Also build
+ * an array of attnums, for storing in the catalog.
+ */
+ publication_translate_columns(pri->relation, pri->columns,
+ &natts, &attarray);
+
/* Form a tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
else
nulls[Anum_pg_publication_rel_prqual - 1] = true;
+ /* Add column list, if available */
+ if (pri->columns)
+ values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
+ else
+ nulls[Anum_pg_publication_rel_prattrs - 1] = true;
+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
false);
+ /* Add dependency on the columns, if any are listed */
+ for (int i = 0; i < natts; i++)
+ {
+ ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
/* Close the table. */
table_close(rel, RowExclusiveLock);
return myself;
}
+/* qsort comparator for attnums */
+static int
+compare_int16(const void *a, const void *b)
+{
+ int av = *(const int16 *) a;
+ int bv = *(const int16 *) b;
+
+ /* this can't overflow if int is wider than int16 */
+ return (av - bv);
+}
+
+/*
+ * Translate a list of column names to an array of attribute numbers
+ * and a Bitmapset with them; verify that each attribute is appropriate
+ * to have in a publication column list (no system or generated attributes,
+ * no duplicates). Additional checks with replica identity are done later;
+ * see check_publication_columns.
+ *
+ * Note that the attribute numbers are *not* offset by
+ * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
+ * is okay.
+ */
+static void
+publication_translate_columns(Relation targetrel, List *columns,
+ int *natts, AttrNumber **attrs)
+{
+ AttrNumber *attarray = NULL;
+ Bitmapset *set = NULL;
+ ListCell *lc;
+ int n = 0;
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ /* Bail out when no column list defined. */
+ if (!columns)
+ return;
+
+ /*
+ * Translate list of columns to attnums. We prohibit system attributes and
+ * make sure there are no duplicate columns.
+ */
+ attarray = palloc(sizeof(AttrNumber) * list_length(columns));
+ foreach(lc, columns)
+ {
+ char *colname = strVal(lfirst(lc));
+ AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
+
+ if (attnum == InvalidAttrNumber)
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colname, RelationGetRelationName(targetrel)));
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot reference system column \"%s\" in publication column list",
+ colname));
+
+ if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot reference generated column \"%s\" in publication column list",
+ colname));
+
+ if (bms_is_member(attnum, set))
+ ereport(ERROR,
+ errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("duplicate column \"%s\" in publication column list",
+ colname));
+
+ set = bms_add_member(set, attnum);
+ attarray[n++] = attnum;
+ }
+
+ /* Be tidy, so that the catalog representation is always sorted */
+ qsort(attarray, n, sizeof(AttrNumber), compare_int16);
+
+ *natts = n;
+ *attrs = attarray;
+
+ bms_free(set);
+}
+
+/*
+ * Transform the column list (represented by an array) to a bitmapset.
+ */
+Bitmapset *
+pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
+{
+ Bitmapset *result = NULL;
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
+ MemoryContext oldcxt;
+
+ /*
+ * If an existing bitmap was provided, use it. Otherwise just use NULL
+ * and build a new bitmap.
+ */
+ if (columns)
+ result = columns;
+
+ arr = DatumGetArrayTypeP(pubcols);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ /* If a memory context was specified, switch to it. */
+ if (mcxt)
+ oldcxt = MemoryContextSwitchTo(mcxt);
+
+ for (int i = 0; i < nelems; i++)
+ result = bms_add_member(result, elems[i]);
+
+ if (mcxt)
+ MemoryContextSwitchTo(oldcxt);
+
+ return result;
+}
+
/*
* Insert new publication / schema mapping.
*/
* Returns true if any invalid column is found.
*/
bool
-contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
+pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
bool pubviaroot)
{
HeapTuple rftuple;
return result;
}
+/*
+ * Check if all columns referenced in the REPLICA IDENTITY are covered by
+ * the column list.
+ *
+ * Returns true if any replica identity column is not covered by column list.
+ */
+bool
+pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
+ bool pubviaroot)
+{
+ HeapTuple tuple;
+ Oid relid = RelationGetRelid(relation);
+ Oid publish_as_relid = RelationGetRelid(relation);
+ bool result = false;
+ Datum datum;
+ bool isnull;
+
+ /*
+ * For a partition, if pubviaroot is true, find the topmost ancestor that
+ * is published via this publication as we need to use its column list
+ * for the changes.
+ *
+ * Note that even though the column list used is for an ancestor, the
+ * REPLICA IDENTITY used will be for the actual child table.
+ */
+ if (pubviaroot && relation->rd_rel->relispartition)
+ {
+ publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
+
+ if (!OidIsValid(publish_as_relid))
+ publish_as_relid = relid;
+ }
+
+ tuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(publish_as_relid),
+ ObjectIdGetDatum(pubid));
+
+ if (!HeapTupleIsValid(tuple))
+ return false;
+
+ datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+
+ if (!isnull)
+ {
+ int x;
+ Bitmapset *idattrs;
+ Bitmapset *columns = NULL;
+
+ /* With REPLICA IDENTITY FULL, no column list is allowed. */
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ result = true;
+
+ /* Transform the column list datum to a bitmapset. */
+ columns = pub_collist_to_bitmapset(NULL, datum, NULL);
+
+ /* Remember columns that are part of the REPLICA IDENTITY */
+ idattrs = RelationGetIndexAttrBitmap(relation,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+ /*
+ * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
+ * offset (to handle system columns the usual way), while column list
+ * does not use offset, so we can't do bms_is_subset(). Instead, we have
+ * to loop over the idattrs and check all of them are in the list.
+ */
+ x = -1;
+ while ((x = bms_next_member(idattrs, x)) >= 0)
+ {
+ AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
+
+ /*
+ * If pubviaroot is true, we are validating the column list of the
+ * parent table, but the bitmap contains the replica identity
+ * information of the child table. The parent/child attnums may not
+ * match, so translate them to the parent - get the attname from
+ * the child, and look it up in the parent.
+ */
+ if (pubviaroot)
+ {
+ /* attribute name in the child table */
+ char *colname = get_attname(relid, attnum, false);
+
+ /*
+ * Determine the attnum for the attribute name in parent (we
+ * are using the column list defined on the parent).
+ */
+ attnum = get_attnum(publish_as_relid, colname);
+ }
+
+ /* replica identity column, not covered by the column list */
+ if (!bms_is_member(attnum, columns))
+ {
+ result = true;
+ break;
+ }
+ }
+
+ bms_free(idattrs);
+ bms_free(columns);
+ }
+
+ ReleaseSysCache(tuple);
+
+ return result;
+}
+
/* check_functions_in_node callback */
static bool
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
}
}
+
+/*
+ * Check the publication column lists expression for all relations in the list.
+ */
+static void
+CheckPubRelationColumnList(List *tables, const char *queryString,
+ bool pubviaroot)
+{
+ ListCell *lc;
+
+ foreach(lc, tables)
+ {
+ PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
+
+ if (pri->columns == NIL)
+ continue;
+
+ /*
+ * If the publication doesn't publish changes via the root partitioned
+ * table, the partition's column list will be used. So disallow using
+ * the column list on partitioned table in this case.
+ */
+ if (!pubviaroot &&
+ pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use publication column list for relation \"%s\"",
+ RelationGetRelationName(pri->relation)),
+ errdetail("column list cannot be used for a partitioned table when %s is false.",
+ "publish_via_partition_root")));
+ }
+}
+
/*
* Create new publication.
*/
TransformPubWhereClauses(rels, pstate->p_sourcetext,
publish_via_partition_root);
+ CheckPubRelationColumnList(rels, pstate->p_sourcetext,
+ publish_via_partition_root);
+
PublicationAddRelations(puboid, rels, true, NULL);
CloseRelationList(rels);
}
/*
* If the publication doesn't publish changes via the root partitioned
- * table, the partition's row filter will be used. So disallow using WHERE
- * clause on partitioned table in this case.
+ * table, the partition's row filter and column list will be used. So disallow
+ * using WHERE clause and column lists on partitioned table in this case.
*/
if (!pubform->puballtables && publish_via_partition_root_given &&
!publish_via_partition_root)
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add partitioned table(s) with WHERE
- * clause(s) which we don't allow when not publishing via root.
+ * clause(s) and/or column lists which we don't allow when not
+ * publishing via root.
*/
LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
AccessShareLock);
{
HeapTuple rftuple;
Oid relid = lfirst_oid(lc);
+ bool has_column_list;
+ bool has_row_filter;
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubform->oid));
+ has_row_filter
+ = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
+
+ has_column_list
+ = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
+
if (HeapTupleIsValid(rftuple) &&
- !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL))
+ (has_row_filter || has_column_list))
{
HeapTuple tuple;
{
Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple);
- if (relform->relkind == RELKIND_PARTITIONED_TABLE)
+ if ((relform->relkind == RELKIND_PARTITIONED_TABLE) &&
+ has_row_filter)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot set %s for publication \"%s\"",
NameStr(relform->relname),
"publish_via_partition_root")));
+ if ((relform->relkind == RELKIND_PARTITIONED_TABLE) &&
+ has_column_list)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot set %s for publication \"%s\"",
+ "publish_via_partition_root = false",
+ stmt->pubname),
+ errdetail("The publication contains a column list for a partitioned table \"%s\" "
+ "which is not allowed when %s is false.",
+ NameStr(relform->relname),
+ "publish_via_partition_root")));
+
ReleaseSysCache(tuple);
}
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+ CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
+
PublicationAddRelations(pubid, rels, false, stmt);
}
else if (stmt->action == AP_DropObjects)
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+ CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
+
/*
* To recreate the relation list for the publication, look for
* existing relations that do not need to be dropped.
PublicationRelInfo *oldrel;
bool found = false;
HeapTuple rftuple;
- bool rfisnull = true;
Node *oldrelwhereclause = NULL;
+ Bitmapset *oldcolumns = NULL;
/* look up the cache for the old relmap */
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(oldrelid),
ObjectIdGetDatum(pubid));
+ /*
+ * See if the existing relation currently has a WHERE clause or a
+ * column list. We need to compare those too.
+ */
if (HeapTupleIsValid(rftuple))
{
+ bool isnull = true;
Datum whereClauseDatum;
+ Datum columnListDatum;
+ /* Load the WHERE clause for this table. */
whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prqual,
- &rfisnull);
- if (!rfisnull)
+ &isnull);
+ if (!isnull)
oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
+ /* Transform the int2vector column list to a bitmap. */
+ columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+
+ if (!isnull)
+ oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
+
ReleaseSysCache(rftuple);
}
foreach(newlc, rels)
{
PublicationRelInfo *newpubrel;
+ Oid newrelid;
+ Bitmapset *newcolumns = NULL;
newpubrel = (PublicationRelInfo *) lfirst(newlc);
+ newrelid = RelationGetRelid(newpubrel->relation);
+
+ /*
+ * If the new publication has column list, transform it to
+ * a bitmap too.
+ */
+ if (newpubrel->columns)
+ {
+ ListCell *lc;
+
+ foreach(lc, newpubrel->columns)
+ {
+ char *colname = strVal(lfirst(lc));
+ AttrNumber attnum = get_attnum(newrelid, colname);
+
+ newcolumns = bms_add_member(newcolumns, attnum);
+ }
+ }
/*
* Check if any of the new set of relations matches with the
* existing relations in the publication. Additionally, if the
* relation has an associated WHERE clause, check the WHERE
- * expressions also match. Drop the rest.
+ * expressions also match. Same for the column list. Drop the
+ * rest.
*/
if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
- if (equal(oldrelwhereclause, newpubrel->whereClause))
+ if (equal(oldrelwhereclause, newpubrel->whereClause) &&
+ bms_equal(oldcolumns, newcolumns))
{
found = true;
break;
{
oldrel = palloc(sizeof(PublicationRelInfo));
oldrel->whereClause = NULL;
+ oldrel->columns = NIL;
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);
delrels = lappend(delrels, oldrel);
{
oldrel = palloc(sizeof(PublicationRelInfo));
oldrel->whereClause = NULL;
+ oldrel->columns = NULL;
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);
delrels = lappend(delrels, oldrel);
List *result = NIL;
ListCell *lc;
List *relids_with_rf = NIL;
+ List *relids_with_collist = NIL;
/*
* Open, share-lock, and check all the explicitly-specified relations
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
+ /* Disallow duplicate tables if there are any with column lists. */
+ if (t->columns || list_member_oid(relids_with_collist, myrelid))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("conflicting or redundant column lists for table \"%s\"",
+ RelationGetRelationName(rel))));
+
table_close(rel, ShareUpdateExclusiveLock);
continue;
}
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
pub_rel->whereClause = t->whereClause;
+ pub_rel->columns = t->columns;
result = lappend(result, pub_rel);
relids = lappend_oid(relids, myrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, myrelid);
+ if (t->columns)
+ relids_with_collist = lappend_oid(relids_with_collist, myrelid);
+
/*
* Add children of this rel, if requested, so that they too are added
* to the publication. A partitioned table can't have any inheritance
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
+ /*
+ * We don't allow to specify column list for both parent
+ * and child table at the same time as it is not very
+ * clear which one should be given preference.
+ */
+ if (childrelid != myrelid &&
+ (t->columns || list_member_oid(relids_with_collist, childrelid)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("conflicting or redundant column lists for table \"%s\"",
+ RelationGetRelationName(rel))));
+
continue;
}
pub_rel->relation = rel;
/* child inherits WHERE clause from parent */
pub_rel->whereClause = t->whereClause;
+ /* child inherits column list from parent */
+ pub_rel->columns = t->columns;
result = lappend(result, pub_rel);
relids = lappend_oid(relids, childrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, childrelid);
+
+ if (t->columns)
+ relids_with_collist = lappend_oid(relids_with_collist, childrelid);
}
}
}
Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
+ if (pubrel->columns)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
+
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubid));
if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
return;
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- return;
-
/*
* It is only safe to execute UPDATE/DELETE when all columns, referenced
* in the row filters from publications which the relation is in, are
errmsg("cannot update table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+ else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot update table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column list used by the publication does not cover the replica identity.")));
else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot delete from table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+ else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot delete from table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column list used by the publication does not cover the replica identity.")));
/* If relation has replica identity we are always good. */
if (OidIsValid(RelationGetReplicaIndex(rel)))
return;
+ /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
+ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ return;
+
/*
* This is UPDATE/DELETE and there is no replica identity.
*
COPY_NODE_FIELD(relation);
COPY_NODE_FIELD(whereClause);
+ COPY_NODE_FIELD(columns);
return newnode;
}
{
COMPARE_NODE_FIELD(relation);
COMPARE_NODE_FIELD(whereClause);
+ COMPARE_NODE_FIELD(columns);
return true;
}
* relation_expr here.
*/
PublicationObjSpec:
- TABLE relation_expr OptWhereClause
+ TABLE relation_expr opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
- $$->pubtable->whereClause = $3;
+ $$->pubtable->columns = $3;
+ $$->pubtable->whereClause = $4;
}
| ALL TABLES IN_P SCHEMA ColId
{
$$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA;
$$->location = @5;
}
- | ColId OptWhereClause
+ | ColId opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
- if ($2)
+ /*
+ * If either a row filter or column list is specified, create
+ * a PublicationTable object.
+ */
+ if ($2 || $3)
{
/*
* The OptWhereClause must be stored here but it is
*/
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
- $$->pubtable->whereClause = $2;
+ $$->pubtable->columns = $2;
+ $$->pubtable->whereClause = $3;
}
else
{
}
$$->location = @1;
}
- | ColId indirection OptWhereClause
+ | ColId indirection opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
- $$->pubtable->whereClause = $3;
+ $$->pubtable->columns = $3;
+ $$->pubtable->whereClause = $4;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
- | extended_relation_expr OptWhereClause
+ | extended_relation_expr opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
- $$->pubtable->whereClause = $2;
+ $$->pubtable->columns = $2;
+ $$->pubtable->whereClause = $3;
}
| CURRENT_SCHEMA
{
errmsg("WHERE clause not allowed for schema"),
parser_errposition(pubobj->location));
+ /* Column list is not allowed on a schema object */
+ if (pubobj->pubtable && pubobj->pubtable->columns)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column specification not allowed for schema"),
+ parser_errposition(pubobj->location));
+
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+ Bitmapset *columns);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
- bool binary);
+ bool binary, Bitmapset *columns);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
+/*
+ * Check if a column is covered by a column list.
+ *
+ * Need to be careful about NULL, which is treated as a column list covering
+ * all columns.
+ */
+static bool
+column_in_column_list(int attnum, Bitmapset *columns)
+{
+ return (columns == NULL || bms_is_member(attnum, columns));
+}
+
+
/*
* Write BEGIN to the output stream.
*/
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- TupleTableSlot *newslot, bool binary)
+ TupleTableSlot *newslot, bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
- bool binary)
+ bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
/*
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+ Bitmapset *columns)
{
char *relname;
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
- logicalrep_write_attrs(out, rel);
+ logicalrep_write_attrs(out, rel, columns);
}
/*
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
- bool binary)
+ bool binary, Bitmapset *columns)
{
TupleDesc desc;
Datum *values;
for (i = 0; i < desc->natts; i++)
{
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (!column_in_column_list(att->attnum, columns))
continue;
+
nliveatts++;
}
pq_sendint16(out, nliveatts);
if (att->attisdropped || att->attgenerated)
continue;
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
if (isnull[i])
{
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
* Write relation attribute metadata to the stream.
*/
static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
TupleDesc desc;
int i;
/* send number of live attributes */
for (i = 0; i < desc->natts; i++)
{
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
continue;
+
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
nliveatts++;
}
pq_sendint16(out, nliveatts);
if (att->attisdropped || att->attgenerated)
continue;
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
ListCell *lc;
bool first;
+ Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
lrel->relname = relname;
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- /* Now fetch columns. */
+
+ /*
+ * Get column lists for each relation.
+ *
+ * For initial synchronization, column lists can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any column list
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ *
+ * We need to do this before fetching info about column names and types,
+ * so that we can skip columns that should not be replicated.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ WalRcvExecResult *pubres;
+ TupleTableSlot *slot;
+ Oid attrsRow[] = {INT2OID};
+ StringInfoData pub_names;
+ bool first = true;
+
+ initStringInfo(&pub_names);
+ foreach(lc, MySubscription->publications)
+ {
+ if (!first)
+ appendStringInfo(&pub_names, ", ");
+ appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
+ first = false;
+ }
+
+ /*
+ * Fetch info about column lists for the relation (from all the
+ * publications). We unnest the int2vector values, because that
+ * makes it easier to combine lists by simply adding the attnums
+ * to a new bitmap (without having to parse the int2vector data).
+ * This preserves NULL values, so that if one of the publications
+ * has no column list, we'll know that.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT unnest"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
+ " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrsRow), attrsRow);
+
+ if (pubres->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, pubres->err)));
+
+ /*
+ * Merge the column lists (from different publications) by creating
+ * a single bitmap with all the attnums. If we find a NULL value,
+ * that means one of the publications has no column list for the
+ * table we're syncing.
+ */
+ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+ {
+ Datum cfval = slot_getattr(slot, 1, &isnull);
+
+ /* NULL means empty column list, so we're done. */
+ if (isnull)
+ {
+ bms_free(included_cols);
+ included_cols = NULL;
+ break;
+ }
+
+ included_cols = bms_add_member(included_cols,
+ DatumGetInt16(cfval));
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(pubres);
+
+ pfree(pub_names.data);
+ }
+
+ /*
+ * Now fetch column names and types.
+ */
resetStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT a.attname,"
+ "SELECT a.attnum,"
+ " a.attname,"
" a.atttypid,"
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL;
+ /*
+ * Store the columns as a list of names. Ignore those that are not
+ * present in the column list, if there is one.
+ */
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
- lrel->attnames[natt] =
- TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ char *rel_colname;
+ AttrNumber attnum;
+
+ attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
- lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+ /* If the column is not in the column list, skip it. */
+ if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+ {
+ ExecClearTuple(slot);
+ continue;
+ }
+
+ rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ lrel->attnames[natt] = rel_colname;
+ lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
- if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+ if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
/* Should never happen. */
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ {
+ appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're replicating
+ * all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
+
+ appendStringInfo(&cmd, ") TO STDOUT");
+ }
else
{
/*
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx);
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
*/
ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
EState *estate; /* executor state used for row filter */
- MemoryContext cache_expr_cxt; /* private context for exprstate and
- * estate, if any */
-
TupleTableSlot *new_slot; /* slot for storing new tuple */
TupleTableSlot *old_slot; /* slot for storing old tuple */
* having identical TupleDesc.
*/
AttrMap *attrmap;
+
+ /*
+ * Columns included in the publication, or NULL if all columns are
+ * included implicitly. Note that the attnums in this bitmap are not
+ * shifted by FirstLowInvalidHeapAttributeNumber.
+ */
+ Bitmapset *columns;
+
+ /*
+ * Private context to store additional data for this entry - state for
+ * the row filter expressions, column list, etc.
+ */
+ MemoryContext entry_cxt;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
RelationSyncEntry *entry,
ReorderBufferChangeType *action);
+/* column list routines */
+static void pgoutput_column_list_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+
/*
* Specify output plugin callbacks
*/
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- send_relation_and_attrs(ancestor, xid, ctx);
+ send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, xid, ctx);
+ send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
*/
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx)
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
if (att->atttypid < FirstGenbkiObjectId)
continue;
+ /* Skip this attribute if it's not present in the column list */
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
+
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, xid, relation);
+ logicalrep_write_rel(ctx->out, xid, relation, columns);
OutputPluginWrite(ctx, false);
}
return DatumGetBool(ret);
}
+/*
+ * Make sure the per-entry memory context exists.
+ */
+static void
+pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
+{
+ Relation relation;
+
+ /* The context may already exist, in which case bail out. */
+ if (entry->entry_cxt)
+ return;
+
+ relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ entry->entry_cxt = AllocSetContextCreate(data->cachectx,
+ "entry private context",
+ ALLOCSET_SMALL_SIZES);
+
+ MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
+ RelationGetRelationName(relation));
+}
+
/*
* Initialize the row filter.
*/
{
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
- Assert(entry->cache_expr_cxt == NULL);
-
- /* Create the memory context for row filters */
- entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
- "Row filter expressions",
- ALLOCSET_DEFAULT_SIZES);
-
- MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
- RelationGetRelationName(relation));
+ pgoutput_ensure_entry_cxt(data, entry);
/*
* Now all the filters for all pubactions are known. Combine them when
* their pubactions are the same.
*/
- oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
+ oldctx = MemoryContextSwitchTo(entry->entry_cxt);
entry->estate = create_estate_for_relation(relation);
for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
{
}
}
+/*
+ * Initialize the column list.
+ */
+static void
+pgoutput_column_list_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+
+ /*
+ * Find if there are any column lists for this relation. If there are,
+ * build a bitmap merging all the column lists.
+ *
+ * All the given publication-table mappings must be checked.
+ *
+ * Multiple publications might have multiple column lists for this relation.
+ *
+ * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
+ * list" so it takes precedence.
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple cftuple = NULL;
+ Datum cfdatum = 0;
+
+ /*
+ * Assume there's no column list. Only if we find pg_publication_rel
+ * entry with a column list we'll switch it to false.
+ */
+ bool pub_no_list = true;
+
+ /*
+ * If the publication is FOR ALL TABLES then it is treated the same as if
+ * there are no column lists (even if other publications have a list).
+ */
+ if (!pub->alltables)
+ {
+ /*
+ * Check for the presence of a column list in this publication.
+ *
+ * Note: If we find no pg_publication_rel row, it's a publication
+ * defined for a whole schema, so it can't have a column list, just
+ * like a FOR ALL TABLES publication.
+ */
+ cftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(cftuple))
+ {
+ /*
+ * Lookup the column list attribute.
+ *
+ * Note: We update the pub_no_list value directly, because if
+ * the value is NULL, we have no list (and vice versa).
+ */
+ cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
+ Anum_pg_publication_rel_prattrs,
+ &pub_no_list);
+
+ /*
+ * Build the column list bitmap in the per-entry context.
+ *
+ * We need to merge column lists from all publications, so we
+ * update the same bitmapset. If the column list is null, we
+ * interpret it as replicating all columns.
+ */
+ if (!pub_no_list) /* when not null */
+ {
+ pgoutput_ensure_entry_cxt(data, entry);
+
+ entry->columns = pub_collist_to_bitmapset(entry->columns,
+ cfdatum,
+ entry->entry_cxt);
+ }
+ }
+ }
+
+ /*
+ * Found a publication with no column list, so we're done. But first
+ * discard column list we might have from preceding publications.
+ */
+ if (pub_no_list)
+ {
+ if (cftuple)
+ ReleaseSysCache(cftuple);
+
+ bms_free(entry->columns);
+ entry->columns = NULL;
+
+ break;
+ }
+
+ ReleaseSysCache(cftuple);
+ } /* loop all subscribed publications */
+
+}
+
/*
* Initialize the slot for storing new and old tuples, and build the map that
* will be used to convert the relation's tuples into the ancestor's format.
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
- data->binary);
+ data->binary, relentry->columns);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel,
- new_slot, data->binary);
+ new_slot, data->binary,
+ relentry->columns);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel,
- old_slot, new_slot, data->binary);
+ old_slot, new_slot, data->binary,
+ relentry->columns);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel,
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
- entry->cache_expr_cxt = NULL;
+ entry->entry_cxt = NULL;
entry->publish_as_relid = InvalidOid;
+ entry->columns = NULL;
entry->attrmap = NULL;
}
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
entry->pubactions.pubinsert = false;
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
/*
* Row filter cache cleanups.
*/
- if (entry->cache_expr_cxt)
- MemoryContextDelete(entry->cache_expr_cxt);
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
- entry->cache_expr_cxt = NULL;
+ entry->entry_cxt = NULL;
entry->estate = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
/*
* Build publication cache. We can't use one provided by relcache as
- * relcache considers all publications given relation is in, but here
- * we only need to consider ones that the subscriber requested.
+ * relcache considers all publications that the given relation is in,
+ * but here we only need to consider ones that the subscriber
+ * requested.
*/
foreach(lc, data->publications)
{
}
/*
+ * If the relation is to be published, determine actions to
+ * publish, and list of columns, if appropriate.
+ *
* Don't publish changes for partitioned tables, because
* publishing those of its partitions suffices, unless partition
* changes won't be published due to pubviaroot being set.
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
+
+ /* Initialize the column list */
+ pgoutput_column_list_init(data, rel_publications, entry);
}
list_free(pubids);
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
+ pubdesc->cols_valid_for_update = true;
+ pubdesc->cols_valid_for_delete = true;
return;
}
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
+ pubdesc->cols_valid_for_update = true;
+ pubdesc->cols_valid_for_delete = true;
/* Fetch the publication membership info. */
puboids = GetRelationPublications(relid);
*/
if (!pubform->puballtables &&
(pubform->pubupdate || pubform->pubdelete) &&
- contain_invalid_rfcolumn(pubid, relation, ancestors,
+ pub_rf_contains_invalid_column(pubid, relation, ancestors,
pubform->pubviaroot))
{
if (pubform->pubupdate)
pubdesc->rf_valid_for_delete = false;
}
+ /*
+ * Check if all columns are part of the REPLICA IDENTITY index or not.
+ *
+ * If the publication is FOR ALL TABLES then it means the table has no
+ * column list and we can skip the validation.
+ */
+ if (!pubform->puballtables &&
+ (pubform->pubupdate || pubform->pubdelete) &&
+ pub_collist_contains_invalid_column(pubid, relation, ancestors,
+ pubform->pubviaroot))
+ {
+ if (pubform->pubupdate)
+ pubdesc->cols_valid_for_update = false;
+ if (pubform->pubdelete)
+ pubdesc->cols_valid_for_delete = false;
+ }
+
ReleaseSysCache(tup);
/*
pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
!pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
break;
+
+ /*
+ * If we know everything is replicated and the column list is invalid
+ * for update and delete, there is no point to check for other
+ * publications.
+ */
+ if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
+ pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
+ !pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete)
+ break;
}
if (relation->rd_pubdesc)
int i_prpubid;
int i_prrelid;
int i_prrelqual;
+ int i_prattrs;
int i,
j,
ntups;
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
- "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual "
- "FROM pg_catalog.pg_publication_rel");
+ "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
+ "(CASE\n"
+ " WHEN pr.prattrs IS NOT NULL THEN\n"
+ " (SELECT array_agg(attname)\n"
+ " FROM\n"
+ " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
+ " ELSE NULL END) prattrs "
+ "FROM pg_catalog.pg_publication_rel pr");
else
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
- "NULL AS prrelqual "
+ "NULL AS prrelqual, NULL AS prattrs "
"FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
i_prrelqual = PQfnumber(res, "prrelqual");
+ i_prattrs = PQfnumber(res, "prattrs");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
else
pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
+ if (!PQgetisnull(res, i, i_prattrs))
+ {
+ char **attnames;
+ int nattnames;
+ PQExpBuffer attribs;
+
+ if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+ &attnames, &nattnames))
+ fatal("could not parse %s array", "prattrs");
+ attribs = createPQExpBuffer();
+ for (int k = 0; k < nattnames; k++)
+ {
+ if (k > 0)
+ appendPQExpBufferStr(attribs, ", ");
+
+ appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+ }
+ pubrinfo[j].pubrattrs = attribs->data;
+ }
+ else
+ pubrinfo[j].pubrattrs = NULL;
+
/* Decide whether we want to dump it */
selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
appendPQExpBuffer(query, " %s",
fmtQualifiedDumpable(tbinfo));
+ if (pubrinfo->pubrattrs)
+ appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+
if (pubrinfo->pubrelqual)
{
/*
PublicationInfo *publication;
TableInfo *pubtable;
char *pubrelqual;
+ char *pubrattrs;
} PublicationRelInfo;
/*
unlike => { exclude_dump_test_schema => 1, },
},
+ 'ALTER PUBLICATION pub1 ADD TABLE test_sixth_table (col3, col2)' => {
+ create_order => 52,
+ create_sql =>
+ 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_sixth_table (col3, col2);',
+ regexp => qr/^
+ \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_sixth_table (col2, col3);\E
+ /xm,
+ like => { %full_runs, section_post_data => 1, },
+ unlike => { exclude_dump_test_schema => 1, },
+ },
+
+ 'ALTER PUBLICATION pub1 ADD TABLE test_seventh_table (col3, col2) WHERE (col1 = 1)' => {
+ create_order => 52,
+ create_sql =>
+ 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_seventh_table (col3, col2) WHERE (col1 = 1);',
+ regexp => qr/^
+ \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_seventh_table (col2, col3) WHERE ((col1 = 1));\E
+ /xm,
+ like => { %full_runs, section_post_data => 1, },
+ unlike => { exclude_dump_test_schema => 1, },
+ },
+
'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test' => {
create_order => 51,
create_sql =>
unlike => { exclude_dump_test_schema => 1, },
},
+ 'CREATE TABLE test_sixth_table' => {
+ create_order => 6,
+ create_sql => 'CREATE TABLE dump_test.test_sixth_table (
+ col1 int,
+ col2 text,
+ col3 bytea
+ );',
+ regexp => qr/^
+ \QCREATE TABLE dump_test.test_sixth_table (\E
+ \n\s+\Qcol1 integer,\E
+ \n\s+\Qcol2 text,\E
+ \n\s+\Qcol3 bytea\E
+ \n\);
+ /xm,
+ like =>
+ { %full_runs, %dump_test_schema_runs, section_pre_data => 1, },
+ unlike => { exclude_dump_test_schema => 1, },
+ },
+
+ 'CREATE TABLE test_seventh_table' => {
+ create_order => 6,
+ create_sql => 'CREATE TABLE dump_test.test_seventh_table (
+ col1 int,
+ col2 text,
+ col3 bytea
+ );',
+ regexp => qr/^
+ \QCREATE TABLE dump_test.test_seventh_table (\E
+ \n\s+\Qcol1 integer,\E
+ \n\s+\Qcol2 text,\E
+ \n\s+\Qcol3 bytea\E
+ \n\);
+ /xm,
+ like =>
+ { %full_runs, %dump_test_schema_runs, section_pre_data => 1, },
+ unlike => { exclude_dump_test_schema => 1, },
+ },
+
'CREATE TABLE test_table_identity' => {
create_order => 3,
create_sql => 'CREATE TABLE dump_test.test_table_identity (
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
+ " , NULL\n"
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
" JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
"UNION\n"
"SELECT pubname\n"
" , pg_get_expr(pr.prqual, c.oid)\n"
+ " , (CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+ " (SELECT string_agg(attname, ', ')\n"
+ " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
+ " ELSE NULL END) "
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
" JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n"
"UNION\n"
"SELECT pubname\n"
" , NULL\n"
+ " , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
+ " , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
"WHERE pr.prrelid = '%s'\n"
"UNION ALL\n"
"SELECT pubname\n"
" , NULL\n"
+ " , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
printfPQExpBuffer(&buf, " \"%s\"",
PQgetvalue(result, i, 0));
+ /* column list (if any) */
+ if (!PQgetisnull(result, i, 2))
+ appendPQExpBuffer(&buf, " (%s)",
+ PQgetvalue(result, i, 2));
+
/* row filter (if any) */
if (!PQgetisnull(result, i, 1))
appendPQExpBuffer(&buf, " WHERE %s",
*/
static bool
addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
- bool singlecol, printTableContent *cont)
+ bool as_schema, printTableContent *cont)
{
PGresult *res;
int count = 0;
for (i = 0; i < count; i++)
{
- if (!singlecol)
+ if (as_schema)
+ printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
+ else
{
printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
+
+ if (!PQgetisnull(res, i, 3))
+ appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 3));
+
if (!PQgetisnull(res, i, 2))
appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2));
}
- else
- printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
printTableAddFooter(cont, buf->data);
}
printfPQExpBuffer(&buf,
"SELECT n.nspname, c.relname");
if (pset.sversion >= 150000)
+ {
appendPQExpBufferStr(&buf,
", pg_get_expr(pr.prqual, c.oid)");
+ appendPQExpBufferStr(&buf,
+ ", (CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+ " pg_catalog.array_to_string("
+ " ARRAY(SELECT attname\n"
+ " FROM\n"
+ " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+ " ELSE NULL END)");
+ }
else
appendPQExpBufferStr(&buf,
- ", NULL");
+ ", NULL, NULL");
appendPQExpBuffer(&buf,
"\nFROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
if (!puballsequences)
{
- /* Get the tables for the specified publication */
+ /* Get the sequences for the specified publication */
printfPQExpBuffer(&buf,
- "SELECT n.nspname, c.relname, NULL\n"
+ "SELECT n.nspname, c.relname, NULL, NULL\n"
"FROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
" pg_catalog.pg_publication_rel pr\n"
*/
bool rf_valid_for_update;
bool rf_valid_for_delete;
+
+ /*
+ * true if the columns are part of the replica identity or the publication actions
+ * do not include UPDATE or DELETE.
+ */
+ bool cols_valid_for_update;
+ bool cols_valid_for_delete;
} PublicationDesc;
typedef struct Publication
{
Relation relation;
Node *whereClause;
+ List *columns;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern void GetActionsInPublication(Oid pubid, PublicationActions *actions);
extern List *GetPublicationSchemas(Oid pubid, char objectType);
extern List *GetSchemaPublications(Oid schemaid, char objectType);
extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType,
char objectType,
bool if_not_exists);
+extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols,
+ MemoryContext mcxt);
+
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
#ifdef CATALOG_VARLEN /* variable-length fields start here */
pg_node_tree prqual; /* qualifications */
+ int2vector prattrs; /* columns to replicate */
#endif
} FormData_pg_publication_rel;
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
extern void InvalidatePublicationRels(List *relids);
-extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation,
+extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation,
+ List *ancestors, bool pubviaroot);
+extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation,
List *ancestors, bool pubviaroot);
#endif /* PUBLICATIONCMDS_H */
NodeTag type;
RangeVar *relation; /* relation to be published */
Node *whereClause; /* qualifications */
+ List *columns; /* List of columns in a publication table */
} PublicationTable;
/*
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel,
TupleTableSlot *newslot,
- bool binary);
+ bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel,
TupleTableSlot *oldslot,
- TupleTableSlot *newslot, bool binary);
+ TupleTableSlot *newslot, bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
bool is_called);
extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
- Relation rel);
+ Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
+-- fail - duplicate tables are not allowed if that table has any column lists
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert');
+ERROR: conflicting or redundant column lists for table "testpub_tbl1"
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert');
+ERROR: conflicting or redundant column lists for table "testpub_tbl1"
+RESET client_min_messages;
+-- test for column lists
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1;
+CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert');
+RESET client_min_messages;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text,
+ d int generated always as (a + length(b)) stored);
+-- error: column "x" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
+ERROR: column "x" of relation "testpub_tbl5" does not exist
+-- error: replica identity "a" not included in the column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
+UPDATE testpub_tbl5 SET a = 1;
+ERROR: cannot update table "testpub_tbl5"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- error: generated column "d" can't be in list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ERROR: cannot reference generated column "d" in publication column list
+-- error: system attributes "ctid" not allowed in column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
+ERROR: cannot reference system column "ctid" in publication column list
+-- ok
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
+ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice
+ERROR: cannot drop column c of table testpub_tbl5 because other objects depend on it
+DETAIL: publication of table testpub_tbl5 in publication testpub_fortable depends on column c of table testpub_tbl5
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+-- ok: for insert-only publication, any column list is acceptable
+ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c);
+/* not all replica identities are good enough */
+CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c);
+ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL;
+ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
+-- error: replica identity (b,c) is not covered by column list (a, c)
+UPDATE testpub_tbl5 SET a = 1;
+ERROR: cannot update table "testpub_tbl5"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- error: change the replica identity to "b", and column list to (a, c)
+-- then update fails, because (a, c) does not cover replica identity
+ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
+UPDATE testpub_tbl5 SET a = 1;
+ERROR: cannot update table "testpub_tbl5"
+DETAIL: Column list used by the publication does not cover the replica identity.
+/* But if upd/del are not published, it works OK */
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
+RESET client_min_messages;
+ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok
+\dRp+ testpub_table_ins
+ Publication testpub_table_ins
+ Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root
+--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+----------
+ regress_publication_user | f | f | t | f | f | t | f | f
+Tables:
+ "public.testpub_tbl5" (a)
+
+-- tests with REPLICA IDENTITY FULL
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);
+UPDATE testpub_tbl6 SET a = 1;
+ERROR: cannot update table "testpub_tbl6"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+UPDATE testpub_tbl6 SET a = 1;
+-- make sure changing the column list is propagated to the catalog
+CREATE TABLE testpub_tbl7 (a int primary key, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b);
+\d+ testpub_tbl7
+ Table "public.testpub_tbl7"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+---------+----------+--------------+-------------
+ a | integer | | not null | | plain | |
+ b | text | | | | extended | |
+ c | text | | | | extended | |
+Indexes:
+ "testpub_tbl7_pkey" PRIMARY KEY, btree (a)
+Publications:
+ "testpub_fortable" (a, b)
+
+-- ok: the column list is the same, we should skip this table (or at least not fail)
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b);
+\d+ testpub_tbl7
+ Table "public.testpub_tbl7"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+---------+----------+--------------+-------------
+ a | integer | | not null | | plain | |
+ b | text | | | | extended | |
+ c | text | | | | extended | |
+Indexes:
+ "testpub_tbl7_pkey" PRIMARY KEY, btree (a)
+Publications:
+ "testpub_fortable" (a, b)
+
+-- ok: the column list changes, make sure the catalog gets updated
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c);
+\d+ testpub_tbl7
+ Table "public.testpub_tbl7"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+---------+----------+--------------+-------------
+ a | integer | | not null | | plain | |
+ b | text | | | | extended | |
+ c | text | | | | extended | |
+Indexes:
+ "testpub_tbl7_pkey" PRIMARY KEY, btree (a)
+Publications:
+ "testpub_fortable" (a, c)
+
+-- column list for partitioned tables has to cover replica identities for
+-- all child relations
+CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
+-- first partition has replica identity "a"
+CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0);
+ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
+-- second partition has replica identity "b"
+CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1);
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+-- ok: column list covers both "a" and "b"
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true');
+RESET client_min_messages;
+-- ok: the same thing, but try plain ADD TABLE
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+UPDATE testpub_tbl8 SET a = 1;
+-- failure: column list does not cover replica identity for the second partition
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+-- failure: one of the partitions has REPLICA IDENTITY FULL
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+-- add table and then try changing replica identity
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+-- failure: replica identity full can't be used with a column list
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+-- failure: replica identity has to be covered by the column list
+ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey;
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+DROP TABLE testpub_tbl8;
+-- column list for partitioned tables has to cover replica identities for
+-- all child relations
+CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+-- first partition has replica identity "a"
+CREATE TABLE testpub_tbl8_0 (a int, b text, c text);
+ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
+-- second partition has replica identity "b"
+CREATE TABLE testpub_tbl8_1 (a int, b text, c text);
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+-- ok: attaching first partition works, because (a) is in column list
+ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0);
+-- failure: second partition has replica identity (c), which si not in column list
+ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1);
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+-- failure: changing replica identity to FULL for partition fails, because
+-- of the column list on the parent
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL;
+UPDATE testpub_tbl8 SET a = 1;
+ERROR: cannot update table "testpub_tbl8_0"
+DETAIL: Column list used by the publication does not cover the replica identity.
+DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1;
+DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list;
+-- ======================================================
+-- Test combination of column list and row filter
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_both_filters;
+RESET client_min_messages;
+CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
+ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
+ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
+\dRp+ testpub_both_filters
+ Publication testpub_both_filters
+ Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root
+--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+----------
+ regress_publication_user | f | f | t | t | t | t | t | f
+Tables:
+ "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
+
+\d+ testpub_tbl_both_filters
+ Table "public.testpub_tbl_both_filters"
+ Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ a | integer | | not null | | plain | |
+ b | integer | | | | plain | |
+ c | integer | | not null | | plain | |
+Indexes:
+ "testpub_tbl_both_filters_pkey" PRIMARY KEY, btree (a, c) REPLICA IDENTITY
+Publications:
+ "testpub_both_filters" (a, c) WHERE (c <> 1)
+
+DROP TABLE testpub_tbl_both_filters;
+DROP PUBLICATION testpub_both_filters;
+-- ======================================================
+-- More column list tests for validating column references
+CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
+CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
+CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
+CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
+ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
+-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
+-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b);
+RESET client_min_messages;
+-- ok - (a,b) coverts all PK cols
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c);
+-- ok - (a,b,c) coverts all PK cols
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- fail - "b" is missing from the column list
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b);
+-- fail - "a" is missing from the column list
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
+-- ok - there's no replica identity, so any column list works
+-- note: it fails anyway, just a bit later because UPDATE requires RI
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+-- Case 2. REPLICA IDENTITY FULL
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
+-- fail - with REPLICA IDENTITY FULL no column list is allowed
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d);
+-- fail - with REPLICA IDENTITY FULL no column list is allowed
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_nopk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+-- Case 3. REPLICA IDENTITY NOTHING
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+-- Case 4. REPLICA IDENTITY INDEX
+ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
+CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
+ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
+CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_pk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
+-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
+-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_nopk"
+DETAIL: Column list used by the publication does not cover the replica identity.
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c);
+-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+-- Tests for partitioned table
+-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned
+-- table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+-- fail - cannot use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
+ERROR: cannot use publication column list for relation "rf_tbl_abcd_part_pk"
+DETAIL: column list cannot be used for a partitioned table when publish_via_partition_root is false.
+-- ok - can use column list for partition
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a);
+-- ok - "a" is a PK col
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned
+-- table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
+-- ok - can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
+-- ok - "a" is a PK col
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is
+-- used for partitioned table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+ERROR: cannot set publish_via_partition_root = false for publication "testpub6"
+DETAIL: The publication contains a column list for a partitioned table "rf_tbl_abcd_part_pk" which is not allowed when publish_via_partition_root is false.
+-- Now change the root column list to use a column "b"
+-- (which is not in the replica identity)
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b);
+-- ok - we don't have column list for partitioned table.
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+-- fail - "b" is not in REPLICA IDENTITY INDEX
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+-- set PUBLISH_VIA_PARTITION_ROOT to true
+-- can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
+-- ok - can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b);
+-- fail - "b" is not in REPLICA IDENTITY INDEX
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
+DETAIL: Column list used by the publication does not cover the replica identity.
+DROP PUBLICATION testpub6;
+DROP TABLE rf_tbl_abcd_pk;
+DROP TABLE rf_tbl_abcd_nopk;
+DROP TABLE rf_tbl_abcd_part_pk;
+-- ======================================================
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);
Tables from schemas:
"pub_test1"
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR: syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR: column specification not allowed for schema
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+ ^
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
+-- fail - duplicate tables are not allowed if that table has any column lists
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert');
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert');
+RESET client_min_messages;
+
+-- test for column lists
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1;
+CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert');
+RESET client_min_messages;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text,
+ d int generated always as (a + length(b)) stored);
+-- error: column "x" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
+-- error: replica identity "a" not included in the column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
+UPDATE testpub_tbl5 SET a = 1;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- error: generated column "d" can't be in list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+-- error: system attributes "ctid" not allowed in column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
+-- ok
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
+ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice
+-- ok: for insert-only publication, any column list is acceptable
+ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c);
+
+/* not all replica identities are good enough */
+CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c);
+ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL;
+ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
+-- error: replica identity (b,c) is not covered by column list (a, c)
+UPDATE testpub_tbl5 SET a = 1;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+
+-- error: change the replica identity to "b", and column list to (a, c)
+-- then update fails, because (a, c) does not cover replica identity
+ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
+UPDATE testpub_tbl5 SET a = 1;
+
+/* But if upd/del are not published, it works OK */
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
+RESET client_min_messages;
+ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok
+\dRp+ testpub_table_ins
+
+-- tests with REPLICA IDENTITY FULL
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);
+UPDATE testpub_tbl6 SET a = 1;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6;
+
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+UPDATE testpub_tbl6 SET a = 1;
+
+-- make sure changing the column list is propagated to the catalog
+CREATE TABLE testpub_tbl7 (a int primary key, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b);
+\d+ testpub_tbl7
+-- ok: the column list is the same, we should skip this table (or at least not fail)
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b);
+\d+ testpub_tbl7
+-- ok: the column list changes, make sure the catalog gets updated
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c);
+\d+ testpub_tbl7
+
+-- column list for partitioned tables has to cover replica identities for
+-- all child relations
+CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
+-- first partition has replica identity "a"
+CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0);
+ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
+-- second partition has replica identity "b"
+CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1);
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+
+-- ok: column list covers both "a" and "b"
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true');
+RESET client_min_messages;
+
+-- ok: the same thing, but try plain ADD TABLE
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+UPDATE testpub_tbl8 SET a = 1;
+
+-- failure: column list does not cover replica identity for the second partition
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
+UPDATE testpub_tbl8 SET a = 1;
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+
+-- failure: one of the partitions has REPLICA IDENTITY FULL
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
+UPDATE testpub_tbl8 SET a = 1;
+ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
+
+-- add table and then try changing replica identity
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+
+-- failure: replica identity full can't be used with a column list
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
+UPDATE testpub_tbl8 SET a = 1;
+
+-- failure: replica identity has to be covered by the column list
+ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey;
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+UPDATE testpub_tbl8 SET a = 1;
+
+DROP TABLE testpub_tbl8;
+
+-- column list for partitioned tables has to cover replica identities for
+-- all child relations
+CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
+ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
+-- first partition has replica identity "a"
+CREATE TABLE testpub_tbl8_0 (a int, b text, c text);
+ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
+-- second partition has replica identity "b"
+CREATE TABLE testpub_tbl8_1 (a int, b text, c text);
+ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
+ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
+
+-- ok: attaching first partition works, because (a) is in column list
+ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0);
+-- failure: second partition has replica identity (c), which si not in column list
+ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1);
+UPDATE testpub_tbl8 SET a = 1;
+
+-- failure: changing replica identity to FULL for partition fails, because
+-- of the column list on the parent
+ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL;
+UPDATE testpub_tbl8 SET a = 1;
+
+DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1;
+DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list;
+-- ======================================================
+
+-- Test combination of column list and row filter
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_both_filters;
+RESET client_min_messages;
+CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
+ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
+ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
+\dRp+ testpub_both_filters
+\d+ testpub_tbl_both_filters
+
+DROP TABLE testpub_tbl_both_filters;
+DROP PUBLICATION testpub_both_filters;
+-- ======================================================
+
+-- More column list tests for validating column references
+CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
+CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
+CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
+CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
+ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
+
+-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
+
+-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b);
+RESET client_min_messages;
+-- ok - (a,b) coverts all PK cols
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c);
+-- ok - (a,b,c) coverts all PK cols
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- fail - "b" is missing from the column list
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b);
+-- fail - "a" is missing from the column list
+UPDATE rf_tbl_abcd_pk SET a = 1;
+
+-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
+-- ok - there's no replica identity, so any column list works
+-- note: it fails anyway, just a bit later because UPDATE requires RI
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+
+-- Case 2. REPLICA IDENTITY FULL
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
+-- fail - with REPLICA IDENTITY FULL no column list is allowed
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d);
+-- fail - with REPLICA IDENTITY FULL no column list is allowed
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+
+-- Case 3. REPLICA IDENTITY NOTHING
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d);
+-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
+-- it still fails later because without RI we can't replicate updates
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+
+-- Case 4. REPLICA IDENTITY INDEX
+ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
+CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
+ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
+ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
+CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
+ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
+-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
+-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_pk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
+-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c);
+-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
+UPDATE rf_tbl_abcd_nopk SET a = 1;
+
+-- Tests for partitioned table
+
+-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned
+-- table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+-- fail - cannot use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
+-- ok - can use column list for partition
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a);
+-- ok - "a" is a PK col
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned
+-- table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
+-- ok - can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
+-- ok - "a" is a PK col
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is
+-- used for partitioned table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+-- Now change the root column list to use a column "b"
+-- (which is not in the replica identity)
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b);
+-- ok - we don't have column list for partitioned table.
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
+-- fail - "b" is not in REPLICA IDENTITY INDEX
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+-- set PUBLISH_VIA_PARTITION_ROOT to true
+-- can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
+-- ok - can use column list for partitioned table
+ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b);
+-- fail - "b" is not in REPLICA IDENTITY INDEX
+UPDATE rf_tbl_abcd_part_pk SET a = 1;
+
+DROP PUBLICATION testpub6;
+DROP TABLE rf_tbl_abcd_pk;
+DROP TABLE rf_tbl_abcd_nopk;
+DROP TABLE rf_tbl_abcd_part_pk;
+-- ======================================================
+
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
\dRp+ testpub1_forschema
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
--- /dev/null
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test partial-column publication of tables
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+sub wait_for_subscription_sync
+{
+ my ($node) = @_;
+
+ # Also wait for initial table sync to finish
+ my $synced_query = "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+
+ $node->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+}
+
+# setup tables on both nodes
+
+# tab1: simple 1:1 replication
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int)
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int)
+));
+
+# tab2: replication from regular to table with fewer columns
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)
+));
+
+# tab3: simple 1:1 replication with weird column names
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab3 ("a'" int PRIMARY KEY, "B" varchar, "c'" int)
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab3 ("a'" int PRIMARY KEY, "c'" int)
+));
+
+# test_part: partitioned tables, with partitioning (including multi-level
+# partitioning, and fewer columns on the subscriber)
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a);
+ CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6);
+ CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a);
+ CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a);
+ CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6);
+ CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a);
+ CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10);
+));
+
+# tab4: table with user-defined enum types
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TYPE test_typ AS ENUM ('blue', 'red');
+ CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TYPE test_typ AS ENUM ('blue', 'red');
+ CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, d text);
+));
+
+
+# TEST: create publication and subscription for some of the tables with
+# column lists
+$node_publisher->safe_psql('postgres', qq(
+ CREATE PUBLICATION pub1
+ FOR TABLE tab1 (a, "B"), tab3 ("a'", "c'"), test_part (a, b), tab4 (a, b, d)
+ WITH (publish_via_partition_root = 'true');
+));
+
+# check that we got the right prattrs values for the publication in the
+# pg_publication_rel catalog (order by relname, to get stable ordering)
+my $result = $node_publisher->safe_psql('postgres', qq(
+ SELECT relname, prattrs
+ FROM pg_publication_rel pb JOIN pg_class pc ON(pb.prrelid = pc.oid)
+ ORDER BY relname
+));
+
+is($result, qq(tab1|1 2
+tab3|1 3
+tab4|1 2 4
+test_part|1 2), 'publication relation updated');
+
+# TEST: insert data into the tables, create subscription and see if sync
+# replicates the right columns
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab1 VALUES (1, 2, 3);
+ INSERT INTO tab1 VALUES (4, 5, 6);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab3 VALUES (1, 2, 3);
+ INSERT INTO tab3 VALUES (4, 5, 6);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab4 VALUES (1, 'red', 3, 'oh my');
+ INSERT INTO tab4 VALUES (2, 'blue', 4, 'hello');
+));
+
+# replication of partitioned table
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part VALUES (1, 'abc', '2021-07-04 12:00:00');
+ INSERT INTO test_part VALUES (2, 'bcd', '2021-07-03 11:12:13');
+ INSERT INTO test_part VALUES (7, 'abc', '2021-07-04 12:00:00');
+ INSERT INTO test_part VALUES (8, 'bcd', '2021-07-03 11:12:13');
+));
+
+# create subscription for the publication, wait for sync to complete,
+# then check the sync results
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+# tab1: only (a,b) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1 ORDER BY a");
+is($result, qq(1|2|
+4|5|), 'insert on column tab1.c is not replicated');
+
+# tab3: only (a,c) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT * FROM tab3 ORDER BY "a'"));
+is($result, qq(1|3
+4|6), 'insert on column tab3.b is not replicated');
+
+# tab4: only (a,b,d) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab4 ORDER BY a");
+is($result, qq(1|red|oh my
+2|blue|hello), 'insert on column tab4.c is not replicated');
+
+# test_part: (a,b) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_part ORDER BY a");
+is($result, qq(1|abc
+2|bcd
+7|abc
+8|bcd), 'insert on column test_part.c columns is not replicated');
+
+
+# TEST: now insert more data into the tables, and wait until we replicate
+# them (not by tablesync, but regular decoding and replication)
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab1 VALUES (2, 3, 4);
+ INSERT INTO tab1 VALUES (5, 6, 7);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab3 VALUES (2, 3, 4);
+ INSERT INTO tab3 VALUES (5, 6, 7);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab4 VALUES (3, 'red', 5, 'foo');
+ INSERT INTO tab4 VALUES (4, 'blue', 6, 'bar');
+));
+
+# replication of partitioned table
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part VALUES (3, 'xxx', '2022-02-01 10:00:00');
+ INSERT INTO test_part VALUES (4, 'yyy', '2022-03-02 15:12:13');
+ INSERT INTO test_part VALUES (9, 'zzz', '2022-04-03 21:00:00');
+ INSERT INTO test_part VALUES (10, 'qqq', '2022-05-04 22:12:13');
+));
+
+# wait for catchup before checking the subscriber
+$node_publisher->wait_for_catchup('sub1');
+
+# tab1: only (a,b) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1 ORDER BY a");
+is($result, qq(1|2|
+2|3|
+4|5|
+5|6|), 'insert on column tab1.c is not replicated');
+
+# tab3: only (a,c) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT * FROM tab3 ORDER BY "a'"));
+is($result, qq(1|3
+2|4
+4|6
+5|7), 'insert on column tab3.b is not replicated');
+
+# tab4: only (a,b,d) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab4 ORDER BY a");
+is($result, qq(1|red|oh my
+2|blue|hello
+3|red|foo
+4|blue|bar), 'insert on column tab4.c is not replicated');
+
+# test_part: (a,b) is replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_part ORDER BY a");
+is($result, qq(1|abc
+2|bcd
+3|xxx
+4|yyy
+7|abc
+8|bcd
+9|zzz
+10|qqq), 'insert on column test_part.c columns is not replicated');
+
+
+# TEST: do some updates on some of the tables, both on columns included
+# in the column list and other
+
+# tab1: update of replicated column
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab1 SET "B" = 2 * "B" where a = 1));
+
+# tab1: update of non-replicated column
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab1 SET c = 2*c where a = 4));
+
+# tab3: update of non-replicated
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab3 SET "B" = "B" || ' updated' where "a'" = 4));
+
+# tab3: update of replicated column
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab3 SET "c'" = 2 * "c'" where "a'" = 1));
+
+# tab4
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab4 SET b = 'blue', c = c * 2, d = d || ' updated' where a = 1));
+
+# tab4
+$node_publisher->safe_psql('postgres',
+ qq(UPDATE tab4 SET b = 'red', c = c * 2, d = d || ' updated' where a = 2));
+
+# wait for the replication to catch up, and check the UPDATE results got
+# replicated correctly, with the right column list
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT * FROM tab1 ORDER BY a));
+is($result,
+qq(1|4|
+2|3|
+4|5|
+5|6|), 'only update on column tab1.b is replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT * FROM tab3 ORDER BY "a'"));
+is($result,
+qq(1|6
+2|4
+4|6
+5|7), 'only update on column tab3.c is replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT * FROM tab4 ORDER BY a));
+
+is($result, qq(1|blue|oh my updated
+2|red|hello updated
+3|red|foo
+4|blue|bar), 'update on column tab4.c is not replicated');
+
+
+# TEST: add table with a column list, insert data, replicate
+
+# insert some data before adding it to the publication
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab2 VALUES (1, 'abc', 3);
+));
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub1 ADD TABLE tab2 (a, b)");
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION");
+
+# wait for the tablesync to complete, add a bit more data and then check
+# the results of the replication
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab2 VALUES (2, 'def', 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab2 ORDER BY a");
+is($result, qq(1|abc
+2|def), 'insert on column tab2.c is not replicated');
+
+# do a couple updates, check the correct stuff gets replicated
+$node_publisher->safe_psql('postgres', qq(
+ UPDATE tab2 SET c = 5 where a = 1;
+ UPDATE tab2 SET b = 'xyz' where a = 2;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab2 ORDER BY a");
+is($result, qq(1|abc
+2|xyz), 'update on column tab2.c is not replicated');
+
+
+# TEST: add a table to two publications with different column lists, and
+# create a single subscription replicating both publications
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int);
+ CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b);
+ CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d);
+
+ -- insert a couple initial records
+ INSERT INTO tab5 VALUES (1, 11, 111, 1111);
+ INSERT INTO tab5 VALUES (2, 22, 222, 2222);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->wait_for_catchup('sub1');
+
+# insert data and make sure all the columns (union of the columns lists)
+# get fully replicated
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab5 VALUES (3, 33, 333, 3333);
+ INSERT INTO tab5 VALUES (4, 44, 444, 4444);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"),
+ qq(1|11|1111
+2|22|2222
+3|33|3333
+4|44|4444),
+ 'overlapping publications with overlapping column lists');
+
+# and finally, remove the column list for one of the publications, which
+# means replicating all columns (removing the column list), but first add
+# the missing column to the table on subscriber
+$node_publisher->safe_psql('postgres', qq(
+ ALTER PUBLICATION pub3 SET TABLE tab5;
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
+ ALTER TABLE tab5 ADD COLUMN c INT;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab5 VALUES (5, 55, 555, 5555);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"),
+ qq(1|11|1111|
+2|22|2222|
+3|33|3333|
+4|44|4444|
+5|55|5555|555),
+ 'overlapping publications with overlapping column lists');
+
+# TEST: create a table with a column list, then change the replica
+# identity by replacing a primary key (but use a different column in
+# the column list)
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int);
+ CREATE PUBLICATION pub4 FOR TABLE tab6 (a, b);
+
+ -- initial data
+ INSERT INTO tab6 VALUES (1, 22, 333, 4444);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab6 VALUES (2, 33, 444, 5555);
+ UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"),
+ qq(1|44||
+2|66||), 'replication with the original primary key');
+
+# now redefine the constraint - move the primary key to a different column
+# (which is still covered by the column list, though)
+
+$node_publisher->safe_psql('postgres', qq(
+ ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey;
+ ALTER TABLE tab6 ADD PRIMARY KEY (b);
+));
+
+# we need to do the same thing on the subscriber
+# XXX What would happen if this happens before the publisher ALTER? Or
+# interleaved, somehow? But that seems unrelated to column lists.
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey;
+ ALTER TABLE tab6 ADD PRIMARY KEY (b);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab6 VALUES (3, 55, 666, 8888);
+ UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"),
+ qq(1|88||
+2|132||
+3|110||),
+ 'replication with the modified primary key');
+
+
+# TEST: create a table with a column list, then change the replica
+# identity by replacing a primary key with a key on multiple columns
+# (all of them covered by the column list)
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int);
+ CREATE PUBLICATION pub5 FOR TABLE tab7 (a, b);
+
+ -- some initial data
+ INSERT INTO tab7 VALUES (1, 22, 333, 4444);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab7 VALUES (2, 33, 444, 5555);
+ UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"),
+ qq(1|44||
+2|66||), 'replication with the original primary key');
+
+# now redefine the constraint - move the primary key to a different column
+# (which is not covered by the column list)
+$node_publisher->safe_psql('postgres', qq(
+ ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey;
+ ALTER TABLE tab7 ADD PRIMARY KEY (a, b);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO tab7 VALUES (3, 55, 666, 7777);
+ UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"),
+ qq(1|88||
+2|132||
+3|110||),
+ 'replication with the modified primary key');
+
+# now switch the primary key again to another columns not covered by the
+# column list, but also generate writes between the drop and creation
+# of the new constraint
+
+$node_publisher->safe_psql('postgres', qq(
+ ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey;
+ INSERT INTO tab7 VALUES (4, 77, 888, 9999);
+ -- update/delete is not allowed for tables without RI
+ ALTER TABLE tab7 ADD PRIMARY KEY (b, a);
+ UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
+ DELETE FROM tab7 WHERE a = 1;
+));
+
+$node_publisher->safe_psql('postgres', qq(
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"),
+ qq(2|264||
+3|220||
+4|154||),
+ 'replication with the modified primary key');
+
+
+# TEST: partitioned tables (with publish_via_partition_root = false)
+# and replica identity. The (leaf) partitions may have different RI, so
+# we need to check the partition RI (with respect to the column list)
+# while attaching the partition.
+
+# First, let's create a partitioned table with two partitions, each with
+# a different RI, but a column list not covering all those RI.
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_part_a (a int, b int, c int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;
+
+ CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;
+
+ -- initial data, one row in each partition
+ INSERT INTO test_part_a VALUES (1, 3);
+ INSERT INTO test_part_a VALUES (6, 4);
+));
+
+# do the same thing on the subscriber (with the opposite column order)
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_part_a (b int, a int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;
+
+ CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;
+));
+
+# create a publication replicating just the column "a", which is not enough
+# for the second partition
+$node_publisher->safe_psql('postgres', qq(
+ CREATE PUBLICATION pub6 FOR TABLE test_part_a (b, a) WITH (publish_via_partition_root = true);
+ ALTER PUBLICATION pub6 ADD TABLE test_part_a_1 (a);
+ ALTER PUBLICATION pub6 ADD TABLE test_part_a_2 (b);
+));
+
+# add the publication to our subscription, wait for sync to complete
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part_a VALUES (2, 5);
+ INSERT INTO test_part_a VALUES (7, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT a, b FROM test_part_a ORDER BY a, b"),
+ qq(1|3
+2|5
+6|4
+7|6),
+ 'partitions with different replica identities not replicated correctly');
+
+# This time start with a column list covering RI for all partitions, but
+# then update the column list to not cover column "b" (needed by the
+# second partition)
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey;
+
+ CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey;
+
+ -- initial data, one row in each partitions
+ INSERT INTO test_part_b VALUES (1, 1);
+ INSERT INTO test_part_b VALUES (6, 2);
+));
+
+# do the same thing on the subscriber
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey;
+
+ CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey;
+));
+
+# create a publication replicating both columns, which is sufficient for
+# both partitions
+$node_publisher->safe_psql('postgres', qq(
+ CREATE PUBLICATION pub7 FOR TABLE test_part_b (a, b) WITH (publish_via_partition_root = true);
+));
+
+# add the publication to our subscription, wait for sync to complete
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part_b VALUES (2, 3);
+ INSERT INTO test_part_b VALUES (7, 4);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_b ORDER BY a, b"),
+ qq(1|1
+2|3
+6|2
+7|4),
+ 'partitions with different replica identities not replicated correctly');
+
+
+# TEST: This time start with a column list covering RI for all partitions,
+# but then update RI for one of the partitions to not be covered by the
+# column list anymore.
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3);
+ ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey;
+
+ CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4);
+ ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey;
+
+ -- initial data, one row for each partition
+ INSERT INTO test_part_c VALUES (1, 3, 5);
+ INSERT INTO test_part_c VALUES (2, 4, 6);
+));
+
+# do the same thing on the subscriber
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3);
+ ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey;
+
+ CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4);
+ ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b);
+ ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey;
+));
+
+# create a publication replicating data through partition root, with a column
+# list on the root, and then add the partitions one by one with separate
+# column lists (but those are not applied)
+$node_publisher->safe_psql('postgres', qq(
+ CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false);
+ ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a,c);
+ ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b);
+));
+
+# add the publication to our subscription, wait for sync to complete
+$node_subscriber->safe_psql('postgres', qq(
+ DROP SUBSCRIPTION sub1;
+ CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part_c VALUES (3, 7, 8);
+ INSERT INTO test_part_c VALUES (4, 9, 10);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"),
+ qq(1||5
+2|4|
+3||8
+4|9|),
+ 'partitions with different replica identities not replicated correctly');
+
+
+# create a publication not replicating data through partition root, without
+# a column list on the root, and then add the partitions one by one with
+# separate column lists
+$node_publisher->safe_psql('postgres', qq(
+ DROP PUBLICATION pub8;
+ CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false);
+ ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a);
+ ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b);
+));
+
+# add the publication to our subscription, wait for sync to complete
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
+ TRUNCATE test_part_c;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ TRUNCATE test_part_c;
+ INSERT INTO test_part_c VALUES (1, 3, 5);
+ INSERT INTO test_part_c VALUES (2, 4, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"),
+ qq(1||
+2|4|),
+ 'partitions with different replica identities not replicated correctly');
+
+
+# TEST: Start with a single partition, with RI compatible with the column
+# list, and then attach a partition with incompatible RI.
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3);
+ ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey;
+
+ INSERT INTO test_part_d VALUES (1, 2);
+));
+
+# do the same thing on the subscriber (in fact, create both partitions right
+# away, no need to delay that)
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a);
+
+ CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3);
+ ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey;
+
+ CREATE TABLE test_part_d_2 PARTITION OF test_part_d FOR VALUES IN (2,4);
+ ALTER TABLE test_part_d_2 ADD PRIMARY KEY (a);
+ ALTER TABLE test_part_d_2 REPLICA IDENTITY USING INDEX test_part_d_2_pkey;
+));
+
+# create a publication replicating both columns, which is sufficient for
+# both partitions
+$node_publisher->safe_psql('postgres', qq(
+ CREATE PUBLICATION pub9 FOR TABLE test_part_d (a) WITH (publish_via_partition_root = true);
+));
+
+# add the publication to our subscription, wait for sync to complete
+$node_subscriber->safe_psql('postgres', qq(
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_part_d VALUES (3, 4);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_d ORDER BY a, b"),
+ qq(1|
+3|),
+ 'partitions with different replica identities not replicated correctly');
+
+# TEST: With a table included in multiple publications, we should use a
+# union of the column lists. So with column lists (a,b) and (a,c) we
+# should replicate (a,b,c).
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
+ CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b);
+ CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c);
+
+ -- initial data
+ INSERT INTO test_mix_1 VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_mix_1 VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_1 ORDER BY a"),
+ qq(1|2|3
+4|5|6),
+ 'a mix of publications should use a union of column list');
+
+
+# TEST: With a table included in multiple publications, we should use a
+# union of the column lists. If any of the publications is FOR ALL
+# TABLES, we should replicate all columns.
+
+# drop unnecessary tables, so as not to interfere with the FOR ALL TABLES
+$node_publisher->safe_psql('postgres', qq(
+ DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1,
+ test_part, test_part_a, test_part_b, test_part_c, test_part_d;
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
+ CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b);
+ CREATE PUBLICATION pub_mix_4 FOR ALL TABLES;
+
+ -- initial data
+ INSERT INTO test_mix_2 VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_3, pub_mix_4;
+ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_mix_2 VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_2"),
+ qq(1|2|3
+4|5|6),
+ 'a mix of publications should use a union of column list');
+
+
+# TEST: With a table included in multiple publications, we should use a
+# union of the column lists. If any of the publications is FOR ALL
+# TABLES IN SCHEMA, we should replicate all columns.
+
+$node_subscriber->safe_psql('postgres', qq(
+ DROP SUBSCRIPTION sub1;
+ CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ DROP TABLE test_mix_2;
+ CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
+ CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b);
+ CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public;
+
+ -- initial data
+ INSERT INTO test_mix_3 VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_mix_3 VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_3"),
+ qq(1|2|3
+4|5|6),
+ 'a mix of publications should use a union of column list');
+
+
+# TEST: Check handling of publish_via_partition_root - if a partition is
+# published through partition root, we should only apply the column list
+# defined for the whole table (not the partitions) - both during the initial
+# sync and when replicating changes. This is what we do for row filters.
+
+$node_subscriber->safe_psql('postgres', qq(
+ DROP SUBSCRIPTION sub1;
+
+ CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
+ CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
+));
+
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
+ CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
+
+ CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+
+ -- initial data
+ INSERT INTO test_root VALUES (1, 2, 3);
+ INSERT INTO test_root VALUES (10, 20, 30);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_root VALUES (2, 3, 4);
+ INSERT INTO test_root VALUES (11, 21, 31);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_root ORDER BY a, b, c"),
+ qq(1||
+2||
+10||
+11||),
+ 'publication via partition root applies column list');
+
+
+# TEST: Multiple publications which publish schema of parent table and
+# partition. The partition is published through two publications, once
+# through a schema (so no column list) containing the parent, and then
+# also directly (with a columns list). The expected outcome is there is
+# no column list.
+
+$node_publisher->safe_psql('postgres', qq(
+ DROP PUBLICATION pub1, pub2, pub3, pub4, pub5, pub6, pub7, pub8;
+
+ CREATE SCHEMA s1;
+ CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);
+
+ CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1;
+ CREATE PUBLICATION pub2 FOR TABLE t_1(b);
+
+ -- initial data
+ INSERT INTO s1.t VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE SCHEMA s1;
+ CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);
+
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO s1.t VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"),
+ qq(1|2|3
+4|5|6),
+ 'two publications, publishing the same relation');
+
+# Now resync the subcription, but with publications in the opposite order.
+# The result should be the same.
+
+$node_subscriber->safe_psql('postgres', qq(
+ TRUNCATE s1.t;
+
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO s1.t VALUES (7, 8, 9);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"),
+ qq(7|8|9),
+ 'two publications, publishing the same relation');
+
+
+# TEST: One publication, containing both the parent and child relations.
+# The expected outcome is list "a", because that's the column list defined
+# for the top-most ancestor added to the publication.
+
+$node_publisher->safe_psql('postgres', qq(
+ DROP SCHEMA s1 CASCADE;
+ CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
+ PARTITION BY RANGE (a);
+ CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);
+
+ CREATE PUBLICATION pub3 FOR TABLE t_1 (a), t_2
+ WITH (PUBLISH_VIA_PARTITION_ROOT);
+
+ -- initial data
+ INSERT INTO t VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ DROP SCHEMA s1 CASCADE;
+ CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
+ PARTITION BY RANGE (a);
+ CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);
+
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO t VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"),
+ qq(1||
+4||),
+ 'publication containing both parent and child relation');
+
+
+# TEST: One publication, containing both the parent and child relations.
+# The expected outcome is list "a", because that's the column list defined
+# for the top-most ancestor added to the publication.
+# Note: The difference from the preceding test is that in this case both
+# relations have a column list defined.
+
+$node_publisher->safe_psql('postgres', qq(
+ DROP TABLE t;
+ CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
+ PARTITION BY RANGE (a);
+ CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);
+
+ CREATE PUBLICATION pub4 FOR TABLE t_1 (a), t_2 (b)
+ WITH (PUBLISH_VIA_PARTITION_ROOT);
+
+ -- initial data
+ INSERT INTO t VALUES (1, 2, 3);
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ DROP TABLE t;
+ CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
+ CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
+ PARTITION BY RANGE (a);
+ CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);
+
+ ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
+));
+
+wait_for_subscription_sync($node_subscriber);
+
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO t VALUES (4, 5, 6);
+));
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"),
+ qq(1||
+4||),
+ 'publication containing both parent and child relation');
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();