Check relkind of tables in CREATE/ALTER SUBSCRIPTION
authorPeter Eisentraut <peter_e@gmx.net>
Wed, 17 May 2017 02:57:16 +0000 (22:57 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Wed, 17 May 2017 02:57:16 +0000 (22:57 -0400)
We used to only check for a supported relkind on the subscriber during
replication, which is needed to ensure that the setup is valid and we
don't crash.  But it's also useful to tell the user immediately when
CREATE or ALTER SUBSCRIPTION is executed that the relation being added
to the subscription is not of a supported relkind.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: tushar <tushar.ahuja@enterprisedb.com>
src/backend/commands/subscriptioncmds.c
src/backend/executor/execReplication.c
src/backend/replication/logical/relation.c
src/include/executor/executor.h

index b80af275da5ce5948e84515da9c78bca55bc0ff2..265f2efd622683ee3b77a12c7dc217b2bb85f7cc 100644 (file)
@@ -33,6 +33,8 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 
+#include "executor/executor.h"
+
 #include "nodes/makefuncs.h"
 
 #include "replication/logicallauncher.h"
@@ -417,6 +419,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
                relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
+               /* Check for supported relkind. */
+               CheckSubscriptionRelkind(get_rel_relkind(relid),
+                                        rv->schemaname, rv->relname);
+
                SetSubscriptionRelState(subid, relid, table_state,
                                        InvalidXLogRecPtr);
            }
@@ -529,6 +535,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
        Oid         relid;
 
        relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+       /* Check for supported relkind. */
+       CheckSubscriptionRelkind(get_rel_relkind(relid),
+                                rv->schemaname, rv->relname);
+
        pubrel_local_oids[off++] = relid;
 
        if (!bsearch(&relid, subrel_local_oids,
index 327a0bad3886f6d2eba88c874f2d6fed13c80395..6af8018b71198d020785c89020c7c9d2a05c626f 100644 (file)
@@ -551,3 +551,23 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
                        RelationGetRelationName(rel)),
                 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
 }
+
+
+/*
+ * Check if we support writing into specific relkind.
+ *
+ * The nspname and relname are only needed for error reporting.
+ */
+void
+CheckSubscriptionRelkind(char relkind, const char *nspname,
+                        const char *relname)
+{
+   /*
+    * We currently only support writing to regular tables.
+    */
+   if (relkind != RELKIND_RELATION)
+       ereport(ERROR,
+               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                errmsg("logical replication target relation \"%s.%s\" is not a table",
+                       nspname, relname)));
+}
index 7c93bfb80a050b4b38e27fb1666aa2f1215a5588..590355a846e6a3f713b6d3aa78dc6ab5310644bf 100644 (file)
@@ -20,6 +20,7 @@
 #include "access/sysattr.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_subscription_rel.h"
+#include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
@@ -258,15 +259,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
                            remoterel->nspname, remoterel->relname)));
        entry->localrel = heap_open(relid, NoLock);
 
-       /*
-        * We currently only support writing to regular and partitioned
-        * tables.
-        */
-       if (entry->localrel->rd_rel->relkind != RELKIND_RELATION)
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("logical replication target relation \"%s.%s\" is not a table",
-                           remoterel->nspname, remoterel->relname)));
+       /* Check for supported relkind. */
+       CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
+                                remoterel->nspname, remoterel->relname);
 
        /*
         * Build the mapping of local attribute numbers to remote attribute
index 4f19579ee0b701d27adead35b47224c464453d75..ab61d35a86364c960094e38c627e377ee70e7208 100644 (file)
@@ -535,5 +535,7 @@ extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
                         TupleTableSlot *searchslot);
 extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 
+extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
+                                    const char *relname);
 
 #endif   /* EXECUTOR_H  */