Allow logical replication to transfer data in binary format.
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 18 Jul 2020 16:44:51 +0000 (12:44 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 18 Jul 2020 16:44:51 +0000 (12:44 -0400)
This patch adds a "binary" option to CREATE/ALTER SUBSCRIPTION.
When that's set, the publisher will send data using the data type's
typsend function if any, rather than typoutput.  This is generally
faster, if slightly less robust.

As committed, we won't try to transfer user-defined array or composite
types in binary, for fear that type OIDs won't match at the subscriber.
This might be changed later, but it seems like fit material for a
follow-on patch.

Dave Cramer, reviewed by Daniel Gustafsson, Petr Jelinek, and others;
adjusted some by me

Discussion: https://postgr.es/m/CADK3HH+R3xMn=8t3Ct+uD+qJ1KD=Hbif5NFMJ+d5DkoCzp6Vgw@mail.gmail.com

21 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/include/replication/logicalproto.h
src/include/replication/pgoutput.h
src/include/replication/walreceiver.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/014_binary.pl [new file with mode: 0644]

index e9cdff486415b7d73cb489861ae3389a0d963078..18ab3d434cb1dd6e45511af85b5a83e127f2a6ac 100644 (file)
@@ -7472,7 +7472,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        (references <link linkend="catalog-pg-database"><structname>pg_database</structname></link>.<structfield>oid</structfield>)
       </para>
       <para>
-       OID of the database which the subscription resides in
+       OID of the database that the subscription resides in
       </para></entry>
      </row>
 
@@ -7500,7 +7500,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subenabled</structfield> <type>bool</type>
       </para>
       <para>
-       If true, the subscription is enabled and should be replicating.
+       If true, the subscription is enabled and should be replicating
+      </para></entry>
+     </row>
+
+    <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subbinary</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription will request that the publisher send data
+       in binary format
       </para></entry>
      </row>
 
@@ -7518,8 +7528,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subslotname</structfield> <type>name</type>
       </para>
       <para>
-       Name of the replication slot in the upstream database. Also used
-       for local replication origin name.
+       Name of the replication slot in the upstream database (also used
+       for the local replication origin name)
       </para></entry>
      </row>
 
@@ -7528,8 +7538,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subsynccommit</structfield> <type>text</type>
       </para>
       <para>
-       Contains the value of the <varname>synchronous_commit</varname>
-       setting for the subscription workers.
+       The <varname>synchronous_commit</varname>
+       setting for the subscription's workers to use
       </para></entry>
      </row>
 
@@ -7538,8 +7548,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subpublications</structfield> <type>text[]</type>
       </para>
       <para>
-       Array of subscribed publication names. These reference the
-       publications on the publisher server. For more on publications
+       Array of subscribed publication names. These reference
+       publications defined in the upstream database. For more on publications
        see <xref linkend="logical-replication-publication"/>.
       </para></entry>
      </row>
index c24ace14d10c4e66e6751e03d8e683334f8bbfa1..81c4e70cdf45754c19b6733bf2b62e022719dd7f 100644 (file)
@@ -163,8 +163,10 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
      <para>
       This clause alters parameters originally set by
       <xref linkend="sql-createsubscription"/>.  See there for more
-      information.  The allowed options are <literal>slot_name</literal> and
-      <literal>synchronous_commit</literal>
+      information.  The parameters that can be altered
+      are <literal>slot_name</literal>,
+      <literal>synchronous_commit</literal>, and
+      <literal>binary</literal>.
      </para>
     </listitem>
    </varlistentry>
index 5bbc165f70d552e94365d9f2b3bd5d6fdd8ceb52..cdb22c54feabd41e6e48c80fde07cd3133306f57 100644 (file)
@@ -152,8 +152,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <listitem>
          <para>
           The value of this parameter overrides the
-          <xref linkend="guc-synchronous-commit"/> setting.  The default
-          value is <literal>off</literal>.
+          <xref linkend="guc-synchronous-commit"/> setting within this
+          subscription's apply worker processes.  The default value
+          is <literal>off</literal>.
          </para>
 
          <para>
@@ -178,6 +179,27 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>binary</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to
+          send the data in binary format (as opposed to text).
+          The default is <literal>false</literal>.
+          Even when this option is enabled, only data types that have
+          binary send and receive functions will be transferred in binary.
+         </para>
+
+         <para>
+          When doing cross-version replication, it could happen that the
+          publisher has a binary send function for some data type, but the
+          subscriber lacks a binary receive function for the type.  In
+          such a case, data transfer will fail, and
+          the <literal>binary</literal> option cannot be used.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>connect</literal> (<type>boolean</type>)</term>
         <listitem>
index cb1573111545d331f7a546e9f725314cd5958c2d..e6afb3203e9f16c8127f8016f9736293635c9fe7 100644 (file)
@@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok)
    sub->name = pstrdup(NameStr(subform->subname));
    sub->owner = subform->subowner;
    sub->enabled = subform->subenabled;
+   sub->binary = subform->subbinary;
 
    /* Get conninfo */
    datum = SysCacheGetAttr(SUBSCRIPTIONOID,
index 5ecd2e986bae0289fb34fa76986cdaee8cff2351..8625cbeab6e470da28c9a85fb74b76b45125d432 100644 (file)
@@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
index 9ebb026187f7503419ba97215c2cb804d7a39b02..40b6377a8522d29e557a0326118c323640a7c1f4 100644 (file)
@@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
  * accommodate that.
  */
 static void
-parse_subscription_options(List *options, bool *connect, bool *enabled_given,
-                          bool *enabled, bool *create_slot,
+parse_subscription_options(List *options,
+                          bool *connect,
+                          bool *enabled_given, bool *enabled,
+                          bool *create_slot,
                           bool *slot_name_given, char **slot_name,
-                          bool *copy_data, char **synchronous_commit,
-                          bool *refresh)
+                          bool *copy_data,
+                          char **synchronous_commit,
+                          bool *refresh,
+                          bool *binary_given, bool *binary)
 {
    ListCell   *lc;
    bool        connect_given = false;
@@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
        *synchronous_commit = NULL;
    if (refresh)
        *refresh = true;
+   if (binary)
+   {
+       *binary_given = false;
+       *binary = false;
+   }
 
    /* Parse options */
    foreach(lc, options)
@@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
            refresh_given = true;
            *refresh = defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "binary") == 0 && binary)
+       {
+           if (*binary_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+
+           *binary_given = true;
+           *binary = defGetBoolean(defel);
+       }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
@@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    char       *conninfo;
    char       *slotname;
    bool        slotname_given;
+   bool        binary;
+   bool        binary_given;
    char        originname[NAMEDATALEN];
    bool        create_slot;
    List       *publications;
@@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     *
     * Connection and publication should not be specified here.
     */
-   parse_subscription_options(stmt->options, &connect, &enabled_given,
-                              &enabled, &create_slot, &slotname_given,
-                              &slotname, &copy_data, &synchronous_commit,
-                              NULL);
+   parse_subscription_options(stmt->options,
+                              &connect,
+                              &enabled_given, &enabled,
+                              &create_slot,
+                              &slotname_given, &slotname,
+                              &copy_data,
+                              &synchronous_commit,
+                              NULL,    /* no "refresh" */
+                              &binary_given, &binary);
 
    /*
     * Since creating a replication slot is not transactional, rolling back
@@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
    values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
    values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+   values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
    values[Anum_pg_subscription_subconninfo - 1] =
        CStringGetTextDatum(conninfo);
    if (slotname)
@@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                char       *slotname;
                bool        slotname_given;
                char       *synchronous_commit;
-
-               parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, &slotname_given, &slotname,
-                                          NULL, &synchronous_commit, NULL);
+               bool        binary_given;
+               bool        binary;
+
+               parse_subscription_options(stmt->options,
+                                          NULL,    /* no "connect" */
+                                          NULL, NULL,  /* no "enabled" */
+                                          NULL,    /* no "create_slot" */
+                                          &slotname_given, &slotname,
+                                          NULL,    /* no "copy_data" */
+                                          &synchronous_commit,
+                                          NULL,    /* no "refresh" */
+                                          &binary_given, &binary);
 
                if (slotname_given)
                {
@@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                    replaces[Anum_pg_subscription_subsynccommit - 1] = true;
                }
 
+               if (binary_given)
+               {
+                   values[Anum_pg_subscription_subbinary - 1] =
+                       BoolGetDatum(binary);
+                   replaces[Anum_pg_subscription_subbinary - 1] = true;
+               }
+
                update_tuple = true;
                break;
            }
@@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                bool        enabled,
                            enabled_given;
 
-               parse_subscription_options(stmt->options, NULL,
-                                          &enabled_given, &enabled, NULL,
-                                          NULL, NULL, NULL, NULL, NULL);
+               parse_subscription_options(stmt->options,
+                                          NULL,    /* no "connect" */
+                                          &enabled_given, &enabled,
+                                          NULL,    /* no "create_slot" */
+                                          NULL, NULL,  /* no "slot_name" */
+                                          NULL,    /* no "copy_data" */
+                                          NULL,    /* no "synchronous_commit" */
+                                          NULL,    /* no "refresh" */
+                                          NULL, NULL); /* no "binary" */
                Assert(enabled_given);
 
                if (!sub->slotname && enabled)
@@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                bool        copy_data;
                bool        refresh;
 
-               parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, NULL, NULL, &copy_data,
-                                          NULL, &refresh);
+               parse_subscription_options(stmt->options,
+                                          NULL,    /* no "connect" */
+                                          NULL, NULL,  /* no "enabled" */
+                                          NULL,    /* no "create_slot" */
+                                          NULL, NULL,  /* no "slot_name" */
+                                          &copy_data,
+                                          NULL,    /* no "synchronous_commit" */
+                                          &refresh,
+                                          NULL, NULL); /* no "binary" */
 
                values[Anum_pg_subscription_subpublications - 1] =
                    publicationListToArray(stmt->publication);
@@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                            (errcode(ERRCODE_SYNTAX_ERROR),
                             errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-               parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, NULL, NULL, &copy_data,
-                                          NULL, NULL);
+               parse_subscription_options(stmt->options,
+                                          NULL,    /* no "connect" */
+                                          NULL, NULL,  /* no "enabled" */
+                                          NULL,    /* no "create_slot" */
+                                          NULL, NULL,  /* no "slot_name" */
+                                          &copy_data,
+                                          NULL,    /* no "synchronous_commit" */
+                                          NULL,    /* no "refresh" */
+                                          NULL, NULL); /* no "binary" */
 
                AlterSubscription_refresh(sub, copy_data);
 
index e4fd1f9bb6f0df4a8f0b5713eade8532d1111814..e9057230e40c3514abd0269dfe401b13b3ace5b7 100644 (file)
@@ -424,6 +424,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
        PQfreemem(pubnames_literal);
        pfree(pubnames_str);
 
+       if (options->proto.logical.binary &&
+           PQserverVersion(conn->streamConn) >= 140000)
+           appendStringInfoString(&cmd, ", binary 'true'");
+
        appendStringInfoChar(&cmd, ')');
    }
    else
index 3c6d0cd17139353adf9430b313727e689c6a6e8a..2b1356ee249fc3af40b646dc76158f552d125dea 100644 (file)
@@ -17,7 +17,6 @@
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
 #include "replication/logicalproto.h"
-#include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -31,7 +30,7 @@
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-                                  HeapTuple tuple);
+                                  HeapTuple tuple, bool binary);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -139,7 +138,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  * Write INSERT to the output stream.
  */
 void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
+logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
 {
    pq_sendbyte(out, 'I');      /* action INSERT */
 
@@ -147,7 +146,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
    pq_sendint32(out, RelationGetRelid(rel));
 
    pq_sendbyte(out, 'N');      /* new tuple follows */
-   logicalrep_write_tuple(out, rel, newtuple);
+   logicalrep_write_tuple(out, rel, newtuple, binary);
 }
 
 /*
@@ -179,7 +178,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-                       HeapTuple newtuple)
+                       HeapTuple newtuple, bool binary)
 {
    pq_sendbyte(out, 'U');      /* action UPDATE */
 
@@ -196,11 +195,11 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
            pq_sendbyte(out, 'O');  /* old tuple follows */
        else
            pq_sendbyte(out, 'K');  /* old key follows */
-       logicalrep_write_tuple(out, rel, oldtuple);
+       logicalrep_write_tuple(out, rel, oldtuple, binary);
    }
 
    pq_sendbyte(out, 'N');      /* new tuple follows */
-   logicalrep_write_tuple(out, rel, newtuple);
+   logicalrep_write_tuple(out, rel, newtuple, binary);
 }
 
 /*
@@ -248,7 +247,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
+logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
 {
    Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
           rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -264,7 +263,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
    else
        pq_sendbyte(out, 'K');  /* old key follows */
 
-   logicalrep_write_tuple(out, rel, oldtuple);
+   logicalrep_write_tuple(out, rel, oldtuple, binary);
 }
 
 /*
@@ -437,7 +436,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
 {
    TupleDesc   desc;
    Datum       values[MaxTupleAttributeNumber];
@@ -474,12 +473,18 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 
        if (isnull[i])
        {
-           pq_sendbyte(out, 'n');  /* null column */
+           pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
            continue;
        }
-       else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+
+       if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
        {
-           pq_sendbyte(out, 'u');  /* unchanged toast column */
+           /*
+            * Unchanged toasted datum.  (Note that we don't promise to detect
+            * unchanged data in general; this is just a cheap check to avoid
+            * sending large values unnecessarily.)
+            */
+           pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
            continue;
        }
 
@@ -488,20 +493,48 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
            elog(ERROR, "cache lookup failed for type %u", att->atttypid);
        typclass = (Form_pg_type) GETSTRUCT(typtup);
 
-       pq_sendbyte(out, 't');  /* 'text' data follows */
-
-       outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
-       pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
-       pfree(outputstr);
+       /*
+        * Choose whether to send in binary.  Obviously, the option must be
+        * requested and the type must have a send function.  Also, if the
+        * type is not built-in then it must not be a composite or array type.
+        * Such types contain type OIDs, which will likely not match at the
+        * receiver if it's not a built-in type.
+        *
+        * XXX this could be relaxed if we changed record_recv and array_recv
+        * to be less picky.
+        *
+        * XXX this fails to apply the restriction to domains over such types.
+        */
+       if (binary &&
+           OidIsValid(typclass->typsend) &&
+           (att->atttypid < FirstGenbkiObjectId ||
+            (typclass->typtype != TYPTYPE_COMPOSITE &&
+             typclass->typelem == InvalidOid)))
+       {
+           bytea      *outputbytes;
+           int         len;
+
+           pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
+           outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
+           len = VARSIZE(outputbytes) - VARHDRSZ;
+           pq_sendint(out, len, 4);    /* length */
+           pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
+           pfree(outputbytes);
+       }
+       else
+       {
+           pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
+           outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
+           pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
+           pfree(outputstr);
+       }
 
        ReleaseSysCache(typtup);
    }
 }
 
 /*
- * Read tuple in remote format from stream.
- *
- * The returned tuple points into the input stringinfo.
+ * Read tuple in logical replication format from stream.
  */
 static void
 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
@@ -512,38 +545,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
    /* Get number of attributes */
    natts = pq_getmsgint(in, 2);
 
-   memset(tuple->changed, 0, sizeof(tuple->changed));
+   /* Allocate space for per-column values; zero out unused StringInfoDatas */
+   tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
+   tuple->colstatus = (char *) palloc(natts * sizeof(char));
 
    /* Read the data */
    for (i = 0; i < natts; i++)
    {
        char        kind;
+       int         len;
+       StringInfo  value = &tuple->colvalues[i];
 
        kind = pq_getmsgbyte(in);
+       tuple->colstatus[i] = kind;
 
        switch (kind)
        {
-           case 'n':           /* null */
-               tuple->values[i] = NULL;
-               tuple->changed[i] = true;
+           case LOGICALREP_COLUMN_NULL:
+               /* nothing more to do */
                break;
-           case 'u':           /* unchanged column */
+           case LOGICALREP_COLUMN_UNCHANGED:
                /* we don't receive the value of an unchanged column */
-               tuple->values[i] = NULL;
                break;
-           case 't':           /* text formatted value */
-               {
-                   int         len;
-
-                   tuple->changed[i] = true;
-
-                   len = pq_getmsgint(in, 4);  /* read length */
-
-                   /* and data */
-                   tuple->values[i] = palloc(len + 1);
-                   pq_copymsgbytes(in, tuple->values[i], len);
-                   tuple->values[i][len] = '\0';
-               }
+           case LOGICALREP_COLUMN_TEXT:
+               len = pq_getmsgint(in, 4);  /* read length */
+
+               /* and data */
+               value->data = palloc(len + 1);
+               pq_copymsgbytes(in, value->data, len);
+               value->data[len] = '\0';
+               /* make StringInfo fully valid */
+               value->len = len;
+               value->cursor = 0;
+               value->maxlen = len;
+               break;
+           case LOGICALREP_COLUMN_BINARY:
+               len = pq_getmsgint(in, 4);  /* read length */
+
+               /* and data */
+               value->data = palloc(len + 1);
+               pq_copymsgbytes(in, value->data, len);
+               /* not strictly necessary but per StringInfo practice */
+               value->data[len] = '\0';
+               /* make StringInfo fully valid */
+               value->len = len;
+               value->cursor = 0;
+               value->maxlen = len;
                break;
            default:
                elog(ERROR, "unrecognized data representation type '%c'", kind);
@@ -552,7 +599,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 }
 
 /*
- * Write relation attributes to the stream.
+ * Write relation attribute metadata to the stream.
  */
 static void
 logicalrep_write_attrs(StringInfo out, Relation rel)
@@ -611,7 +658,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 }
 
 /*
- * Read relation attribute names from the stream.
+ * Read relation attribute metadata from the stream.
  */
 static void
 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
index f90a896fc3e974878b904bfae253b0dbf1a3670c..407eee3c0bc9e7db0bea25eb01d4f7cf1aa10673 100644 (file)
@@ -319,13 +319,13 @@ slot_store_error_callback(void *arg)
 }
 
 /*
- * Store data in C string form into slot.
- * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
- * use better.
+ * Store tuple data into slot.
+ *
+ * Incoming data can be either text or binary format.
  */
 static void
-slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
-                   char **values)
+slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+               LogicalRepTupleData *tupleData)
 {
    int         natts = slot->tts_tupleDescriptor->natts;
    int         i;
@@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
    errcallback.previous = error_context_stack;
    error_context_stack = &errcallback;
 
-   /* Call the "in" function for each non-dropped attribute */
+   /* Call the "in" function for each non-dropped, non-null attribute */
    Assert(natts == rel->attrmap->maplen);
    for (i = 0; i < natts; i++)
    {
        Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
        int         remoteattnum = rel->attrmap->attnums[i];
 
-       if (!att->attisdropped && remoteattnum >= 0 &&
-           values[remoteattnum] != NULL)
+       if (!att->attisdropped && remoteattnum >= 0)
        {
-           Oid         typinput;
-           Oid         typioparam;
+           StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
 
            errarg.local_attnum = i;
            errarg.remote_attnum = remoteattnum;
 
-           getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-           slot->tts_values[i] =
-               OidInputFunctionCall(typinput, values[remoteattnum],
-                                    typioparam, att->atttypmod);
-           slot->tts_isnull[i] = false;
+           if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+           {
+               Oid         typinput;
+               Oid         typioparam;
+
+               getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+               slot->tts_values[i] =
+                   OidInputFunctionCall(typinput, colvalue->data,
+                                        typioparam, att->atttypmod);
+               slot->tts_isnull[i] = false;
+           }
+           else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+           {
+               Oid         typreceive;
+               Oid         typioparam;
+
+               /*
+                * In some code paths we may be asked to re-parse the same
+                * tuple data.  Reset the StringInfo's cursor so that works.
+                */
+               colvalue->cursor = 0;
+
+               getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+               slot->tts_values[i] =
+                   OidReceiveFunctionCall(typreceive, colvalue,
+                                          typioparam, att->atttypmod);
+
+               /* Trouble if it didn't eat the whole buffer */
+               if (colvalue->cursor != colvalue->len)
+                   ereport(ERROR,
+                           (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+                            errmsg("incorrect binary data format in logical replication column %d",
+                                   remoteattnum + 1)));
+               slot->tts_isnull[i] = false;
+           }
+           else
+           {
+               /*
+                * NULL value from remote.  (We don't expect to see
+                * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
+                * NULL.)
+                */
+               slot->tts_values[i] = (Datum) 0;
+               slot->tts_isnull[i] = true;
+           }
 
            errarg.local_attnum = -1;
            errarg.remote_attnum = -1;
@@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
        else
        {
            /*
-            * We assign NULL to dropped attributes, NULL values, and missing
-            * values (missing values should be later filled using
+            * We assign NULL to dropped attributes and missing values
+            * (missing values should be later filled using
             * slot_fill_defaults).
             */
            slot->tts_values[i] = (Datum) 0;
@@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 }
 
 /*
- * Replace selected columns with user data provided as C strings.
+ * Replace updated columns with data from the LogicalRepTupleData struct.
  * This is somewhat similar to heap_modify_tuple but also calls the type
  * input functions on the user data.
- * "slot" is filled with a copy of the tuple in "srcslot", with
- * columns selected by the "replaces" array replaced with data values
- * from "values".
+ *
+ * "slot" is filled with a copy of the tuple in "srcslot", replacing
+ * columns provided in "tupleData" and leaving others as-is.
+ *
  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
  * storage for "srcslot".  This is OK for current usage, but someday we may
  * need to materialize "slot" at the end to make it independent of "srcslot".
  */
 static void
-slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
-                    LogicalRepRelMapEntry *rel,
-                    char **values, bool *replaces)
+slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
+                LogicalRepRelMapEntry *rel,
+                LogicalRepTupleData *tupleData)
 {
    int         natts = slot->tts_tupleDescriptor->natts;
    int         i;
@@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
        if (remoteattnum < 0)
            continue;
 
-       if (!replaces[remoteattnum])
-           continue;
-
-       if (values[remoteattnum] != NULL)
+       if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
        {
-           Oid         typinput;
-           Oid         typioparam;
+           StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
 
            errarg.local_attnum = i;
            errarg.remote_attnum = remoteattnum;
 
-           getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-           slot->tts_values[i] =
-               OidInputFunctionCall(typinput, values[remoteattnum],
-                                    typioparam, att->atttypmod);
-           slot->tts_isnull[i] = false;
+           if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+           {
+               Oid         typinput;
+               Oid         typioparam;
+
+               getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+               slot->tts_values[i] =
+                   OidInputFunctionCall(typinput, colvalue->data,
+                                        typioparam, att->atttypmod);
+               slot->tts_isnull[i] = false;
+           }
+           else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+           {
+               Oid         typreceive;
+               Oid         typioparam;
+
+               /*
+                * In some code paths we may be asked to re-parse the same
+                * tuple data.  Reset the StringInfo's cursor so that works.
+                */
+               colvalue->cursor = 0;
+
+               getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+               slot->tts_values[i] =
+                   OidReceiveFunctionCall(typreceive, colvalue,
+                                          typioparam, att->atttypmod);
+
+               /* Trouble if it didn't eat the whole buffer */
+               if (colvalue->cursor != colvalue->len)
+                   ereport(ERROR,
+                           (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+                            errmsg("incorrect binary data format in logical replication column %d",
+                                   remoteattnum + 1)));
+               slot->tts_isnull[i] = false;
+           }
+           else
+           {
+               /* must be LOGICALREP_COLUMN_NULL */
+               slot->tts_values[i] = (Datum) 0;
+               slot->tts_isnull[i] = true;
+           }
 
            errarg.local_attnum = -1;
            errarg.remote_attnum = -1;
        }
-       else
-       {
-           slot->tts_values[i] = (Datum) 0;
-           slot->tts_isnull[i] = true;
-       }
    }
 
    /* Pop the error context stack */
@@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s)
 
    /* Process and store remote tuple in the slot */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-   slot_store_cstrings(remoteslot, rel, newtup.values);
+   slot_store_data(remoteslot, rel, &newtup);
    slot_fill_defaults(rel, estate, remoteslot);
    MemoryContextSwitchTo(oldctx);
 
@@ -765,7 +831,7 @@ apply_handle_update(StringInfo s)
    target_rte = list_nth(estate->es_range_table, 0);
    for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
    {
-       if (newtup.changed[i])
+       if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED)
            target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
                                                     i + 1 - FirstLowInvalidHeapAttributeNumber);
    }
@@ -776,8 +842,8 @@ apply_handle_update(StringInfo s)
 
    /* Build the search tuple. */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-   slot_store_cstrings(remoteslot, rel,
-                       has_oldtup ? oldtup.values : newtup.values);
+   slot_store_data(remoteslot, rel,
+                   has_oldtup ? &oldtup : &newtup);
    MemoryContextSwitchTo(oldctx);
 
    /* For a partitioned table, apply update to correct partition. */
@@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
    {
        /* Process and store remote tuple in the slot */
        oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-       slot_modify_cstrings(remoteslot, localslot, relmapentry,
-                            newtup->values, newtup->changed);
+       slot_modify_data(remoteslot, localslot, relmapentry, newtup);
        MemoryContextSwitchTo(oldctx);
 
        EvalPlanQualSetSlot(&epqstate, remoteslot);
@@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s)
 
    /* Build the search tuple. */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-   slot_store_cstrings(remoteslot, rel, oldtup.values);
+   slot_store_data(remoteslot, rel, &oldtup);
    MemoryContextSwitchTo(oldctx);
 
    /* For a partitioned table, apply delete to correct partition. */
@@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
                if (found)
                {
                    /* Apply the update.  */
-                   slot_modify_cstrings(remoteslot_part, localslot,
-                                        part_entry,
-                                        newtup->values, newtup->changed);
+                   slot_modify_data(remoteslot_part, localslot,
+                                    part_entry,
+                                    newtup);
                    MemoryContextSwitchTo(oldctx);
                }
                else
@@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s)
    }
 
    /*
-    * Even if we used CASCADE on the upstream primary we explicitly default to
-    * replaying changes without further cascading. This might be later
+    * Even if we used CASCADE on the upstream primary we explicitly default
+    * to replaying changes without further cascading. This might be later
     * changeable with a user specified option.
     */
    ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
@@ -1850,60 +1915,21 @@ maybe_reread_subscription(void)
        proc_exit(0);
    }
 
-   /*
-    * Exit if connection string was changed. The launcher will start new
-    * worker.
-    */
-   if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
-   {
-       ereport(LOG,
-               (errmsg("logical replication apply worker for subscription \"%s\" will "
-                       "restart because the connection information was changed",
-                       MySubscription->name)));
-
-       proc_exit(0);
-   }
-
-   /*
-    * Exit if subscription name was changed (it's used for
-    * fallback_application_name). The launcher will start new worker.
-    */
-   if (strcmp(newsub->name, MySubscription->name) != 0)
-   {
-       ereport(LOG,
-               (errmsg("logical replication apply worker for subscription \"%s\" will "
-                       "restart because subscription was renamed",
-                       MySubscription->name)));
-
-       proc_exit(0);
-   }
-
    /* !slotname should never happen when enabled is true. */
    Assert(newsub->slotname);
 
    /*
-    * We need to make new connection to new slot if slot name has changed so
-    * exit here as well if that's the case.
-    */
-   if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
-   {
-       ereport(LOG,
-               (errmsg("logical replication apply worker for subscription \"%s\" will "
-                       "restart because the replication slot name was changed",
-                       MySubscription->name)));
-
-       proc_exit(0);
-   }
-
-   /*
-    * Exit if publication list was changed. The launcher will start new
-    * worker.
+    * Exit if any parameter that affects the remote connection was changed.
+    * The launcher will start a new worker.
     */
-   if (!equal(newsub->publications, MySubscription->publications))
+   if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
+       strcmp(newsub->name, MySubscription->name) != 0 ||
+       strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
+       newsub->binary != MySubscription->binary ||
+       !equal(newsub->publications, MySubscription->publications))
    {
        ereport(LOG,
-               (errmsg("logical replication apply worker for subscription \"%s\" will "
-                       "restart because subscription's publications were changed",
+               (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
                        MySubscription->name)));
 
        proc_exit(0);
@@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg)
    options.slotname = myslotname;
    options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
    options.proto.logical.publication_names = MySubscription->publications;
+   options.proto.logical.binary = MySubscription->binary;
 
    /* Start normal logical streaming replication. */
    walrcv_startstreaming(wrconn, &options);
index 15379e311819a6e6e8efd68029470ad969987f22..81ef7dc4c1a33c1380cbcb5705c4be4faee3d786 100644 (file)
@@ -15,6 +15,7 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
@@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-                       List **publication_names)
+                       List **publication_names, bool *binary)
 {
    ListCell   *lc;
    bool        protocol_version_given = false;
    bool        publication_names_given = false;
+   bool        binary_option_given = false;
+
+   *binary = false;
 
    foreach(lc, options)
    {
@@ -168,6 +172,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
                        (errcode(ERRCODE_INVALID_NAME),
                         errmsg("invalid publication_names syntax")));
        }
+       else if (strcmp(defel->defname, "binary") == 0)
+       {
+           if (binary_option_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           binary_option_given = true;
+
+           *binary = defGetBoolean(defel);
+       }
        else
            elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
    }
@@ -202,7 +216,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        /* Parse the params and ERROR if we see any we don't recognize */
        parse_output_parameters(ctx->output_plugin_options,
                                &data->protocol_version,
-                               &data->publication_names);
+                               &data->publication_names,
+                               &data->binary);
 
        /* Check if we support requested protocol */
        if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@@ -411,7 +426,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_insert(ctx->out, relation, tuple);
+               logicalrep_write_insert(ctx->out, relation, tuple,
+                                       data->binary);
                OutputPluginWrite(ctx, true);
                break;
            }
@@ -435,7 +451,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
+               logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
+                                       data->binary);
                OutputPluginWrite(ctx, true);
                break;
            }
@@ -455,7 +472,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_delete(ctx->out, relation, oldtuple);
+               logicalrep_write_delete(ctx->out, relation, oldtuple,
+                                       data->binary);
                OutputPluginWrite(ctx, true);
            }
            else
index 857c7c2278ad37329b49cb9fd5d1167eac2a8504..94459b3539ad4443327ac3a85884c00a68720662 100644 (file)
@@ -4205,6 +4205,7 @@ getSubscriptions(Archive *fout)
    int         i_subslotname;
    int         i_subsynccommit;
    int         i_subpublications;
+   int         i_subbinary;
    int         i,
                ntups;
 
@@ -4229,18 +4230,26 @@ getSubscriptions(Archive *fout)
 
    query = createPQExpBuffer();
 
-   resetPQExpBuffer(query);
-
    /* Get the subscriptions in current database. */
    appendPQExpBuffer(query,
-                     "SELECT s.tableoid, s.oid, s.subname,"
-                     "(%s s.subowner) AS rolname, "
-                     " s.subconninfo, s.subslotname, s.subsynccommit, "
-                     " s.subpublications "
-                     "FROM pg_subscription s "
-                     "WHERE s.subdbid = (SELECT oid FROM pg_database"
-                     "                   WHERE datname = current_database())",
+                     "SELECT s.tableoid, s.oid, s.subname,\n"
+                     " (%s s.subowner) AS rolname,\n"
+                     " s.subconninfo, s.subslotname, s.subsynccommit,\n"
+                     " s.subpublications,\n",
                      username_subquery);
+
+   if (fout->remoteVersion >= 140000)
+       appendPQExpBuffer(query,
+                         " s.subbinary\n");
+   else
+       appendPQExpBuffer(query,
+                         " false AS subbinary\n");
+
+   appendPQExpBuffer(query,
+                     "FROM pg_subscription s\n"
+                     "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+                     "                   WHERE datname = current_database())");
+
    res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
    ntups = PQntuples(res);
@@ -4253,6 +4262,7 @@ getSubscriptions(Archive *fout)
    i_subslotname = PQfnumber(res, "subslotname");
    i_subsynccommit = PQfnumber(res, "subsynccommit");
    i_subpublications = PQfnumber(res, "subpublications");
+   i_subbinary = PQfnumber(res, "subbinary");
 
    subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4274,6 +4284,8 @@ getSubscriptions(Archive *fout)
            pg_strdup(PQgetvalue(res, i, i_subsynccommit));
        subinfo[i].subpublications =
            pg_strdup(PQgetvalue(res, i, i_subpublications));
+       subinfo[i].subbinary =
+           pg_strdup(PQgetvalue(res, i, i_subbinary));
 
        if (strlen(subinfo[i].rolname) == 0)
            pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@@ -4342,6 +4354,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
    else
        appendPQExpBufferStr(query, "NONE");
 
+   if (strcmp(subinfo->subbinary, "t") == 0)
+       appendPQExpBuffer(query, ", binary = true");
+
    if (strcmp(subinfo->subsynccommit, "off") != 0)
        appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
index 0c2fcfb3a9ca42a973e246331e86af94e8cca06d..da97b731b1591c2ba5a7dfc4f4a6d93941e0d51d 100644 (file)
@@ -625,6 +625,7 @@ typedef struct _SubscriptionInfo
    char       *rolname;
    char       *subconninfo;
    char       *subslotname;
+   char       *subbinary;
    char       *subsynccommit;
    char       *subpublications;
 } SubscriptionInfo;
index 3b870c3b17e238c327d1b7ec3833f8c54c5e77bd..e197dcdb4d23ae5048807f63f2afc76e925fec5f 100644 (file)
@@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose)
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
    static const bool translate_columns[] = {false, false, false, false,
-   false, false};
+   false, false, false};
 
    if (pset.sversion < 100000)
    {
@@ -5989,6 +5989,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 
    if (verbose)
    {
+       /* Binary mode is only supported in v14 and higher */
+       if (pset.sversion >= 140000)
+           appendPQExpBuffer(&buf,
+                             ", subbinary AS \"%s\"\n",
+                             gettext_noop("Binary"));
+
        appendPQExpBuffer(&buf,
                          ",  subsynccommit AS \"%s\"\n"
                          ",  subconninfo AS \"%s\"\n",
index 6b3aa7c0063e28eef06bd76701cab8453e41e540..ccb586ad00f4109afad967a7911e1dd480543b6e 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202007131
+#define CATALOG_VERSION_NO 202007181
 
 #endif
index 0a756d42d8453fad1f2c9e24be72936d73b67e97..a041ce9740adc1d1a02081f128eb96ad833738bd 100644 (file)
@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
    bool        subenabled;     /* True if the subscription is enabled (the
                                 * worker should be running) */
 
+   bool        subbinary;      /* True if the subscription wants the
+                                * publisher to send data in binary */
+
 #ifdef CATALOG_VARLEN          /* variable-length fields start here */
    /* Connection string to the publisher */
    text        subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +76,8 @@ typedef struct Subscription
    char       *name;           /* Name of the subscription */
    Oid         owner;          /* Oid of the subscription owner */
    bool        enabled;        /* Indicates if the subscription is enabled */
+   bool        binary;         /* Indicates if the subscription wants data in
+                                * binary format */
    char       *conninfo;       /* Connection string to the publisher */
    char       *slotname;       /* Name of the replication slot */
    char       *synccommit;     /* Synchronous commit setting for worker */
index 4860561be9f5de1f8cc4b05c8eda4c063c2009b0..287288ab415cfda872159942bd3f3409e436bad3 100644 (file)
 /* Tuple coming via logical replication. */
 typedef struct LogicalRepTupleData
 {
-   /* column values in text format, or NULL for a null value: */
-   char       *values[MaxTupleAttributeNumber];
-   /* markers for changed/unchanged column values: */
-   bool        changed[MaxTupleAttributeNumber];
+   /* Array of StringInfos, one per column; some may be unused */
+   StringInfoData *colvalues;
+   /* Array of markers for null/unchanged/text/binary, one per column */
+   char       *colstatus;
 } LogicalRepTupleData;
 
+/* Possible values for LogicalRepTupleData.colstatus[colnum] */
+/* These values are also used in the on-the-wire protocol */
+#define LOGICALREP_COLUMN_NULL     'n'
+#define LOGICALREP_COLUMN_UNCHANGED    'u'
+#define LOGICALREP_COLUMN_TEXT     't'
+#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
+
 typedef uint32 LogicalRepRelId;
 
 /* Relation information */
@@ -87,15 +94,15 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
                                    XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, Relation rel,
-                                   HeapTuple newtuple);
+                                   HeapTuple newtuple, bool binary);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-                                   HeapTuple newtuple);
+                                   HeapTuple newtuple, bool binary);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
                                              bool *has_oldtuple, LogicalRepTupleData *oldtup,
                                              LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, Relation rel,
-                                   HeapTuple oldtuple);
+                                   HeapTuple oldtuple, bool binary);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
                                              LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
index 2e8e9daf44576e8f9faec3ef1f7619406bbaac4e..a8c676ed23b52ddf51ba7655a946bf4dceee5b00 100644 (file)
@@ -20,11 +20,11 @@ typedef struct PGOutputData
    MemoryContext context;      /* private memory context for transient
                                 * allocations */
 
-   /* client info */
+   /* client-supplied info: */
    uint32      protocol_version;
-
    List       *publication_names;
    List       *publications;
+   bool        binary;
 } PGOutputData;
 
 #endif                         /* PGOUTPUT_H */
index c75dcebea0c1cfc6d32a8436d402c01053d7eeab..c2d5dbee5491622137b1dcbd4150096be644a893 100644 (file)
@@ -177,6 +177,7 @@ typedef struct
        {
            uint32      proto_version;  /* Logical protocol version */
            List       *publication_names;  /* String list of publications */
+           bool        binary; /* Ask publisher to use binary */
        }           logical;
    }           proto;
 } WalRcvStreamOptions;
index e7add9d2b81be9b9c1d2e7634fcec1596613c781..d71db0d520748375037e522cc8ae009989766a27 100644 (file)
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | off                | dbname=regress_doesnotexist
+                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,10 +91,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | off                | dbname=regress_doesnotexist2
+                                                          List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                        List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | local              | dbname=regress_doesnotexist2
+                                                            List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -155,6 +155,29 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub;
 NOTICE:  subscription "regress_testsub" does not exist, skipping
 DROP SUBSCRIPTION regress_testsub;  -- fail
 ERROR:  subscription "regress_testsub" does not exist
+-- fail - binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
+ERROR:  binary requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off                | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
index 9e234ab8b3f7be6a20daa747a34742fd0977ec20..eeb2ec06ebed24f4fedee1d1b547a9b842e52310 100644 (file)
@@ -117,6 +117,21 @@ COMMIT;
 DROP SUBSCRIPTION IF EXISTS regress_testsub;
 DROP SUBSCRIPTION regress_testsub;  -- fail
 
+-- fail - binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
new file mode 100644 (file)
index 0000000..36a2f58
--- /dev/null
@@ -0,0 +1,134 @@
+# Binary mode logical replication test
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create and initialize a publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create and initialize subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables on both sides of the replication
+my $ddl = qq(
+   CREATE TABLE public.test_numerical (
+       a INTEGER PRIMARY KEY,
+       b NUMERIC,
+       c FLOAT,
+       d BIGINT
+       );
+   CREATE TABLE public.test_arrays (
+       a INTEGER[] PRIMARY KEY,
+       b NUMERIC[],
+       c TEXT[]
+       ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Configure logical replication
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tpub FOR ALL TABLES");
+
+my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
+     . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
+
+# Ensure nodes are in sync with each other
+$node_publisher->wait_for_catchup('tsub');
+$node_subscriber->poll_query_until('postgres',
+   "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
+) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert some content and make sure it's replicated across
+$node_publisher->safe_psql(
+   'postgres', qq(
+   INSERT INTO public.test_arrays (a, b, c) VALUES
+       ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+       ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+
+   INSERT INTO public.test_numerical (a, b, c, d) VALUES
+       (1, 1.2, 1.3, 10),
+       (2, 2.2, 2.3, 20),
+       (3, 3.2, 3.3, 30);
+   ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+   "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+3|3.2|3.3|30', 'check replicated data on subscriber');
+
+# Test updates as well
+$node_publisher->safe_psql(
+   'postgres', qq(
+   UPDATE public.test_arrays SET b[1] = 42, c = NULL;
+   UPDATE public.test_numerical SET b = 42, c = NULL;
+   ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT a, b, c FROM test_arrays ORDER BY a");
+
+is( $result, '{1,2,3}|{42,1.2,1.3}|
+{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|42||10
+2|42||20
+3|42||30', 'check updated replicated data on subscriber');
+
+# Test to reset back to text formatting, and then to binary again
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tsub SET (binary = false);");
+
+$node_publisher->safe_psql(
+   'postgres', qq(
+   INSERT INTO public.test_numerical (a, b, c, d) VALUES
+       (4, 4.2, 4.3, 40);
+   ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|42||10
+2|42||20
+3|42||30
+4|4.2|4.3|40', 'check replicated data on subscriber');
+
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tsub SET (binary = true);");
+
+$node_publisher->safe_psql(
+   'postgres', qq(
+   INSERT INTO public.test_arrays (a, b, c) VALUES
+       ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}');
+   ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT a, b, c FROM test_arrays ORDER BY a");
+
+is( $result, '{1,2,3}|{42,1.2,1.3}|
+{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
+{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');