Allow batch insertion during COPY into a foreign table.
authorEtsuro Fujita <efujita@postgresql.org>
Thu, 13 Oct 2022 09:45:00 +0000 (18:45 +0900)
committerEtsuro Fujita <efujita@postgresql.org>
Thu, 13 Oct 2022 09:45:00 +0000 (18:45 +0900)
Commit 3d956d956 allowed the COPY, but it's done by inserting individual
rows to the foreign table, so it can be inefficient due to the overhead
caused by each round-trip to the foreign server.  To improve performance
of the COPY in such a case, this patch allows batch insertion, by
extending the multi-insert machinery in CopyFrom() to the foreign-table
case so that we insert multiple rows to the foreign table at once using
the FDW callback routine added by commit b663a4136.  This patch also
allows this for postgres_fdw.  It is enabled by the "batch_size" option
added by commit b663a4136, which is disabled by default.

When doing batch insertion, we update progress of the COPY command after
performing the FDW callback routine, to count rows not suppressed by the
FDW as well as a BEFORE ROW INSERT trigger.  For consistency, this patch
changes the timing of updating it for plain tables: previously, we
updated it immediately after adding each row to the multi-insert buffer,
but we do so only after writing the rows stored in the buffer out to the
table using table_multi_insert(), which I think would be consistent even
with non-batching mode, because in that mode we update it after writing
each row out to the table using table_tuple_insert().

Andrey Lepikhov, heavily revised by me, with review from Ian Barwick,
Andrey Lepikhov, and Zhihong Yu.

Discussion: https://postgr.es/m/bc489202-9855-7550-d64c-ad2d83c24867%40postgrespro.ru

contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/sql/postgres_fdw.sql
doc/src/sgml/fdwhandler.sgml
doc/src/sgml/postgres-fdw.sgml
src/backend/commands/copyfrom.c
src/include/commands/copyfrom_internal.h

index cc9e39c4a5f3897f3cc974db76c0f853989fa8ef..9746998751aa8a320cd8f4ac59110ecc606a21c5 100644 (file)
@@ -8608,6 +8608,39 @@ select tableoid::regclass, * FROM remp1;
  remp1    | 1 | bar
 (2 rows)
 
+delete from ctrtest;
+-- Test copy tuple routing with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+copy ctrtest from stdin;
+select tableoid::regclass, * FROM ctrtest;
+ tableoid | a |   b   
+----------+---+-------
+ remp1    | 1 | foo
+ remp1    | 1 | bar
+ remp1    | 1 | test1
+ remp2    | 2 | baz
+ remp2    | 2 | qux
+ remp2    | 2 | test2
+(6 rows)
+
+select tableoid::regclass, * FROM remp1;
+ tableoid | a |   b   
+----------+---+-------
+ remp1    | 1 | foo
+ remp1    | 1 | bar
+ remp1    | 1 | test1
+(3 rows)
+
+select tableoid::regclass, * FROM remp2;
+ tableoid |   b   | a 
+----------+-------+---
+ remp2    | baz   | 2
+ remp2    | qux   | 2
+ remp2    | test2 | 2
+(3 rows)
+
+delete from ctrtest;
+alter server loopback options (drop batch_size);
 drop table ctrtest;
 drop table loct1;
 drop table loct2;
@@ -8771,6 +8804,78 @@ select * from rem3;
 
 drop foreign table rem3;
 drop table loc3;
+-- Test COPY FROM with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+-- Test basic functionality
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2  
+----+-----
+  1 | foo
+  2 | bar
+  3 | baz
+(3 rows)
+
+delete from rem2;
+-- Test check constraints
+alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
+alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
+-- check constraint is enforced on the remote side, not locally
+copy rem2 from stdin;
+copy rem2 from stdin; -- ERROR
+ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
+DETAIL:  Failing row contains (-1, xyzzy).
+CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
+COPY rem2
+select * from rem2;
+ f1 | f2  
+----+-----
+  1 | foo
+  2 | bar
+  3 | baz
+(3 rows)
+
+alter foreign table rem2 drop constraint rem2_f1positive;
+alter table loc2 drop constraint loc2_f1positive;
+delete from rem2;
+-- Test remote triggers
+create trigger trig_row_before_insert before insert on loc2
+   for each row execute procedure trig_row_before_insupdate();
+-- The new values are concatenated with ' triggered !'
+copy rem2 from stdin;
+select * from rem2;
+ f1 |       f2        
+----+-----------------
+  1 | foo triggered !
+  2 | bar triggered !
+  3 | baz triggered !
+(3 rows)
+
+drop trigger trig_row_before_insert on loc2;
+delete from rem2;
+create trigger trig_null before insert on loc2
+   for each row execute procedure trig_null();
+-- Nothing happens
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2 
+----+----
+(0 rows)
+
+drop trigger trig_null on loc2;
+delete from rem2;
+-- Check with zero-column foreign table; batch insert will be disabled
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(3 rows)
+
+delete from rem2;
+alter server loopback options (drop batch_size);
 -- ===================================================================
 -- test for TRUNCATE
 -- ===================================================================
index 8d013f5b1af3933346af56d79220442e11784163..d98709e5e88118dc18a4e1f4b8c4186dca2daddb 100644 (file)
@@ -2057,6 +2057,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
          resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
        return 1;
 
+   /*
+    * If the foreign table has no columns, disable batching as the INSERT
+    * syntax doesn't allow batching multiple empty rows into a zero-column
+    * table in a single statement.  This is needed for COPY FROM, in which
+    * case fmstate must be non-NULL.
+    */
+   if (fmstate && list_length(fmstate->target_attrs) == 0)
+       return 1;
+
    /*
     * Otherwise use the batch size specified for server/table. The number of
     * parameters in a batch is limited to 65535 (uint16), so make sure we
index e48ccd286bbdd771bbe6c97e29938a6a87654dcd..1962051e541128851bcf6e9e0078cbb91559ffa6 100644 (file)
@@ -2373,6 +2373,28 @@ copy remp1 from stdin;
 
 select tableoid::regclass, * FROM remp1;
 
+delete from ctrtest;
+
+-- Test copy tuple routing with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+
+copy ctrtest from stdin;
+1  foo
+1  bar
+2  baz
+2  qux
+1  test1
+2  test2
+\.
+
+select tableoid::regclass, * FROM ctrtest;
+select tableoid::regclass, * FROM remp1;
+select tableoid::regclass, * FROM remp2;
+
+delete from ctrtest;
+
+alter server loopback options (drop batch_size);
+
 drop table ctrtest;
 drop table loct1;
 drop table loct2;
@@ -2527,6 +2549,86 @@ select * from rem3;
 drop foreign table rem3;
 drop table loc3;
 
+-- Test COPY FROM with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+
+-- Test basic functionality
+copy rem2 from stdin;
+1  foo
+2  bar
+3  baz
+\.
+select * from rem2;
+
+delete from rem2;
+
+-- Test check constraints
+alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
+alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
+
+-- check constraint is enforced on the remote side, not locally
+copy rem2 from stdin;
+1  foo
+2  bar
+3  baz
+\.
+copy rem2 from stdin; -- ERROR
+-1 xyzzy
+\.
+select * from rem2;
+
+alter foreign table rem2 drop constraint rem2_f1positive;
+alter table loc2 drop constraint loc2_f1positive;
+
+delete from rem2;
+
+-- Test remote triggers
+create trigger trig_row_before_insert before insert on loc2
+   for each row execute procedure trig_row_before_insupdate();
+
+-- The new values are concatenated with ' triggered !'
+copy rem2 from stdin;
+1  foo
+2  bar
+3  baz
+\.
+select * from rem2;
+
+drop trigger trig_row_before_insert on loc2;
+
+delete from rem2;
+
+create trigger trig_null before insert on loc2
+   for each row execute procedure trig_null();
+
+-- Nothing happens
+copy rem2 from stdin;
+1  foo
+2  bar
+3  baz
+\.
+select * from rem2;
+
+drop trigger trig_null on loc2;
+
+delete from rem2;
+
+-- Check with zero-column foreign table; batch insert will be disabled
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+
+\.
+select * from rem2;
+
+delete from rem2;
+
+alter server loopback options (drop batch_size);
+
 -- ===================================================================
 -- test for TRUNCATE
 -- ===================================================================
index d0b5951019b8a3e5d379ac79da6a523d8888d53b..94263c628faaaa29845ae14a0b5308d1ae832577 100644 (file)
@@ -665,7 +665,9 @@ ExecForeignBatchInsert(EState *estate,
 
     <para>
      Note that this function is also called when inserting routed tuples into
-     a foreign-table partition.  See the callback functions
+     a foreign-table partition or executing <command>COPY FROM</command> on
+     a foreign table, in which case it is called in a different way than it
+     is in the <command>INSERT</command> case.  See the callback functions
      described below that allow the FDW to support that.
     </para>
 
index bfd344cdc0e35b0fd53acac4708064318899573d..527f4deaaa20d87e5001531d938a54e2d9dcb529 100644 (file)
@@ -398,6 +398,10 @@ OPTIONS (ADD password_required 'false');
        exceeds the limit, the <literal>batch_size</literal> will be adjusted to
        avoid an error.
       </para>
+
+      <para>
+       This option also applies when copying into foreign tables.
+      </para>
      </listitem>
     </varlistentry>
 
index 175aa837f2effe92dd4d0149bff191830ba08245..a079c70152f4b953d3bd8f85fbfdfac159051a98 100644 (file)
@@ -78,7 +78,8 @@ typedef struct CopyMultiInsertBuffer
 {
    TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
    ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
-   BulkInsertState bistate;    /* BulkInsertState for this rel */
+   BulkInsertState bistate;    /* BulkInsertState for this rel if plain
+                                * table; NULL if foreign table */
    int         nused;          /* number of 'slots' containing tuples */
    uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
                                                 * stream */
@@ -116,6 +117,12 @@ CopyFromErrorCallback(void *arg)
 {
    CopyFromState cstate = (CopyFromState) arg;
 
+   if (cstate->relname_only)
+   {
+       errcontext("COPY %s",
+                  cstate->cur_relname);
+       return;
+   }
    if (cstate->opts.binary)
    {
        /* can't usefully display the data */
@@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
    buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
    memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
    buffer->resultRelInfo = rri;
-   buffer->bistate = GetBulkInsertState();
+   buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
    buffer->nused = 0;
 
    return buffer;
@@ -299,83 +306,171 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
  */
 static inline void
 CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-                          CopyMultiInsertBuffer *buffer)
+                          CopyMultiInsertBuffer *buffer,
+                          int64 *processed)
 {
-   MemoryContext oldcontext;
-   int         i;
-   uint64      save_cur_lineno;
    CopyFromState cstate = miinfo->cstate;
    EState     *estate = miinfo->estate;
-   CommandId   mycid = miinfo->mycid;
-   int         ti_options = miinfo->ti_options;
-   bool        line_buf_valid = cstate->line_buf_valid;
    int         nused = buffer->nused;
    ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
    TupleTableSlot **slots = buffer->slots;
+   int         i;
 
-   /*
-    * Print error context information correctly, if one of the operations
-    * below fails.
-    */
-   cstate->line_buf_valid = false;
-   save_cur_lineno = cstate->cur_lineno;
+   if (resultRelInfo->ri_FdwRoutine)
+   {
+       int         batch_size = resultRelInfo->ri_BatchSize;
+       int         sent = 0;
 
-   /*
-    * table_multi_insert may leak memory, so switch to short-lived memory
-    * context before calling it.
-    */
-   oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-   table_multi_insert(resultRelInfo->ri_RelationDesc,
-                      slots,
-                      nused,
-                      mycid,
-                      ti_options,
-                      buffer->bistate);
-   MemoryContextSwitchTo(oldcontext);
+       Assert(buffer->bistate == NULL);
+
+       /* Ensure that the FDW supports batching and it's enabled */
+       Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
+       Assert(batch_size > 1);
 
-   for (i = 0; i < nused; i++)
-   {
        /*
-        * If there are any indexes, update them for all the inserted tuples,
-        * and run AFTER ROW INSERT triggers.
+        * We suppress error context information other than the relation name,
+        * if one of the operations below fails.
         */
-       if (resultRelInfo->ri_NumIndices > 0)
+       Assert(!cstate->relname_only);
+       cstate->relname_only = true;
+
+       while (sent < nused)
        {
-           List       *recheckIndexes;
-
-           cstate->cur_lineno = buffer->linenos[i];
-           recheckIndexes =
-               ExecInsertIndexTuples(resultRelInfo,
-                                     buffer->slots[i], estate, false, false,
-                                     NULL, NIL);
-           ExecARInsertTriggers(estate, resultRelInfo,
-                                slots[i], recheckIndexes,
-                                cstate->transition_capture);
-           list_free(recheckIndexes);
+           int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
+           int         inserted = size;
+           TupleTableSlot **rslots;
+
+           /* insert into foreign table: let the FDW do it */
+           rslots =
+               resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+                                                                    resultRelInfo,
+                                                                    &slots[sent],
+                                                                    NULL,
+                                                                    &inserted);
+
+           sent += size;
+
+           /* No need to do anything if there are no inserted rows */
+           if (inserted <= 0)
+               continue;
+
+           /* Triggers on foreign tables should not have transition tables */
+           Assert(resultRelInfo->ri_TrigDesc == NULL ||
+                  resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
+
+           /* Run AFTER ROW INSERT triggers */
+           if (resultRelInfo->ri_TrigDesc != NULL &&
+               resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+           {
+               Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+               for (i = 0; i < inserted; i++)
+               {
+                   TupleTableSlot *slot = rslots[i];
+
+                   /*
+                    * AFTER ROW Triggers might reference the tableoid column,
+                    * so (re-)initialize tts_tableOid before evaluating them.
+                    */
+                   slot->tts_tableOid = relid;
+
+                   ExecARInsertTriggers(estate, resultRelInfo,
+                                        slot, NIL,
+                                        cstate->transition_capture);
+               }
+           }
+
+           /* Update the row counter and progress of the COPY command */
+           *processed += inserted;
+           pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+                                        *processed);
        }
 
+       for (i = 0; i < nused; i++)
+           ExecClearTuple(slots[i]);
+
+       /* reset relname_only */
+       cstate->relname_only = false;
+   }
+   else
+   {
+       CommandId   mycid = miinfo->mycid;
+       int         ti_options = miinfo->ti_options;
+       bool        line_buf_valid = cstate->line_buf_valid;
+       uint64      save_cur_lineno = cstate->cur_lineno;
+       MemoryContext oldcontext;
+
+       Assert(buffer->bistate != NULL);
+
        /*
-        * There's no indexes, but see if we need to run AFTER ROW INSERT
-        * triggers anyway.
+        * Print error context information correctly, if one of the operations
+        * below fails.
         */
-       else if (resultRelInfo->ri_TrigDesc != NULL &&
-                (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-                 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+       cstate->line_buf_valid = false;
+
+       /*
+        * table_multi_insert may leak memory, so switch to short-lived memory
+        * context before calling it.
+        */
+       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       table_multi_insert(resultRelInfo->ri_RelationDesc,
+                          slots,
+                          nused,
+                          mycid,
+                          ti_options,
+                          buffer->bistate);
+       MemoryContextSwitchTo(oldcontext);
+
+       for (i = 0; i < nused; i++)
        {
-           cstate->cur_lineno = buffer->linenos[i];
-           ExecARInsertTriggers(estate, resultRelInfo,
-                                slots[i], NIL, cstate->transition_capture);
+           /*
+            * If there are any indexes, update them for all the inserted
+            * tuples, and run AFTER ROW INSERT triggers.
+            */
+           if (resultRelInfo->ri_NumIndices > 0)
+           {
+               List       *recheckIndexes;
+
+               cstate->cur_lineno = buffer->linenos[i];
+               recheckIndexes =
+                   ExecInsertIndexTuples(resultRelInfo,
+                                         buffer->slots[i], estate, false,
+                                         false, NULL, NIL);
+               ExecARInsertTriggers(estate, resultRelInfo,
+                                    slots[i], recheckIndexes,
+                                    cstate->transition_capture);
+               list_free(recheckIndexes);
+           }
+
+           /*
+            * There's no indexes, but see if we need to run AFTER ROW INSERT
+            * triggers anyway.
+            */
+           else if (resultRelInfo->ri_TrigDesc != NULL &&
+                    (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+                     resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+           {
+               cstate->cur_lineno = buffer->linenos[i];
+               ExecARInsertTriggers(estate, resultRelInfo,
+                                    slots[i], NIL,
+                                    cstate->transition_capture);
+           }
+
+           ExecClearTuple(slots[i]);
        }
 
-       ExecClearTuple(slots[i]);
+       /* Update the row counter and progress of the COPY command */
+       *processed += nused;
+       pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+                                    *processed);
+
+       /* reset cur_lineno and line_buf_valid to what they were */
+       cstate->line_buf_valid = line_buf_valid;
+       cstate->cur_lineno = save_cur_lineno;
    }
 
    /* Mark that all slots are free */
    buffer->nused = 0;
-
-   /* reset cur_lineno and line_buf_valid to what they were */
-   cstate->line_buf_valid = line_buf_valid;
-   cstate->cur_lineno = save_cur_lineno;
 }
 
 /*
@@ -387,22 +482,30 @@ static inline void
 CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
                             CopyMultiInsertBuffer *buffer)
 {
+   ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
    int         i;
 
    /* Ensure buffer was flushed */
    Assert(buffer->nused == 0);
 
    /* Remove back-link to ourself */
-   buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+   resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
-   FreeBulkInsertState(buffer->bistate);
+   if (resultRelInfo->ri_FdwRoutine == NULL)
+   {
+       Assert(buffer->bistate != NULL);
+       FreeBulkInsertState(buffer->bistate);
+   }
+   else
+       Assert(buffer->bistate == NULL);
 
    /* Since we only create slots on demand, just drop the non-null ones. */
    for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
        ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-   table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-                            miinfo->ti_options);
+   if (resultRelInfo->ri_FdwRoutine == NULL)
+       table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+                                miinfo->ti_options);
 
    pfree(buffer);
 }
@@ -418,7 +521,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
  * 'curr_rri'.
  */
 static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
+                        int64 *processed)
 {
    ListCell   *lc;
 
@@ -426,7 +530,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
    {
        CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
 
-       CopyMultiInsertBufferFlush(miinfo, buffer);
+       CopyMultiInsertBufferFlush(miinfo, buffer, processed);
    }
 
    miinfo->bufferedTuples = 0;
@@ -679,6 +783,23 @@ CopyFrom(CopyFromState cstate)
        resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
                                                         resultRelInfo);
 
+   /*
+    * Also, if the named relation is a foreign table, determine if the FDW
+    * supports batch insert and determine the batch size (a FDW may support
+    * batching, but it may be disabled for the server/table).
+    *
+    * If the FDW does not support batching, we set the batch size to 1.
+    */
+   if (resultRelInfo->ri_FdwRoutine != NULL &&
+       resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+       resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+       resultRelInfo->ri_BatchSize =
+           resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+   else
+       resultRelInfo->ri_BatchSize = 1;
+
+   Assert(resultRelInfo->ri_BatchSize >= 1);
+
    /* Prepare to catch AFTER triggers. */
    AfterTriggerBeginQuery();
 
@@ -708,10 +829,11 @@ CopyFrom(CopyFromState cstate)
 
    /*
     * It's generally more efficient to prepare a bunch of tuples for
-    * insertion, and insert them in one table_multi_insert() call, than call
-    * table_tuple_insert() separately for every tuple. However, there are a
-    * number of reasons why we might not be able to do this.  These are
-    * explained below.
+    * insertion, and insert them in one
+    * table_multi_insert()/ExecForeignBatchInsert() call, than call
+    * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
+    * However, there are a number of reasons why we might not be able to do
+    * this.  These are explained below.
     */
    if (resultRelInfo->ri_TrigDesc != NULL &&
        (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -725,6 +847,15 @@ CopyFrom(CopyFromState cstate)
         */
        insertMethod = CIM_SINGLE;
    }
+   else if (resultRelInfo->ri_FdwRoutine != NULL &&
+            resultRelInfo->ri_BatchSize == 1)
+   {
+       /*
+        * Can't support multi-inserts to a foreign table if the FDW does not
+        * support batching, or it's disabled for the server or foreign table.
+        */
+       insertMethod = CIM_SINGLE;
+   }
    else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
             resultRelInfo->ri_TrigDesc->trig_insert_new_table)
    {
@@ -737,14 +868,12 @@ CopyFrom(CopyFromState cstate)
         */
        insertMethod = CIM_SINGLE;
    }
-   else if (resultRelInfo->ri_FdwRoutine != NULL ||
-            cstate->volatile_defexprs)
+   else if (cstate->volatile_defexprs)
    {
        /*
-        * Can't support multi-inserts to foreign tables or if there are any
-        * volatile default expressions in the table.  Similarly to the
-        * trigger case above, such expressions may query the table we're
-        * inserting into.
+        * Can't support multi-inserts if there are any volatile default
+        * expressions in the table.  Similarly to the trigger case above,
+        * such expressions may query the table we're inserting into.
         *
         * Note: It does not matter if any partitions have any volatile
         * default expressions as we use the defaults from the target of the
@@ -767,13 +896,14 @@ CopyFrom(CopyFromState cstate)
         * For partitioned tables, we may still be able to perform bulk
         * inserts.  However, the possibility of this depends on which types
         * of triggers exist on the partition.  We must disable bulk inserts
-        * if the partition is a foreign table or it has any before row insert
-        * or insert instead triggers (same as we checked above for the parent
-        * table).  Since the partition's resultRelInfos are initialized only
-        * when we actually need to insert the first tuple into them, we must
-        * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
-        * flag that we must later determine if we can use bulk-inserts for
-        * the partition being inserted into.
+        * if the partition is a foreign table that can't use batching or it
+        * has any before row insert or insert instead triggers (same as we
+        * checked above for the parent table).  Since the partition's
+        * resultRelInfos are initialized only when we actually need to insert
+        * the first tuple into them, we must have the intermediate insert
+        * method of CIM_MULTI_CONDITIONAL to flag that we must later
+        * determine if we can use bulk-inserts for the partition being
+        * inserted into.
         */
        if (proute)
            insertMethod = CIM_MULTI_CONDITIONAL;
@@ -910,12 +1040,14 @@ CopyFrom(CopyFromState cstate)
 
                /*
                 * Disable multi-inserts when the partition has BEFORE/INSTEAD
-                * OF triggers, or if the partition is a foreign partition.
+                * OF triggers, or if the partition is a foreign table that
+                * can't use batching.
                 */
                leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
                    !has_before_insert_row_trig &&
                    !has_instead_insert_row_trig &&
-                   resultRelInfo->ri_FdwRoutine == NULL;
+                   (resultRelInfo->ri_FdwRoutine == NULL ||
+                    resultRelInfo->ri_BatchSize > 1);
 
                /* Set the multi-insert buffer to use for this partition. */
                if (leafpart_use_multi_insert)
@@ -931,7 +1063,9 @@ CopyFrom(CopyFromState cstate)
                     * Flush pending inserts if this partition can't use
                     * batching, so rows are visible to triggers etc.
                     */
-                   CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+                   CopyMultiInsertInfoFlush(&multiInsertInfo,
+                                            resultRelInfo,
+                                            &processed);
                }
 
                if (bistate != NULL)
@@ -1067,7 +1201,17 @@ CopyFrom(CopyFromState cstate)
                     * buffers out to their tables.
                     */
                    if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-                       CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+                       CopyMultiInsertInfoFlush(&multiInsertInfo,
+                                                resultRelInfo,
+                                                &processed);
+
+                   /*
+                    * We delay updating the row counter and progress of the
+                    * COPY command until after writing the tuples stored in
+                    * the buffer out to the table, as in single insert mode.
+                    * See CopyMultiInsertBufferFlush().
+                    */
+                   continue;   /* next tuple please */
                }
                else
                {
@@ -1130,7 +1274,7 @@ CopyFrom(CopyFromState cstate)
    if (insertMethod != CIM_SINGLE)
    {
        if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-           CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+           CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    }
 
    /* Done, clean up */
@@ -1348,6 +1492,7 @@ BeginCopyFrom(ParseState *pstate,
    cstate->cur_lineno = 0;
    cstate->cur_attname = NULL;
    cstate->cur_attval = NULL;
+   cstate->relname_only = false;
 
    /*
     * Allocate buffers for the input pipeline.
index e37c6032ae611170bb6c70a3561fd0e99314e1e2..8d9cc5accdb45a8e38140561589b9b050013951d 100644 (file)
@@ -40,13 +40,16 @@ typedef enum EolType
 } EolType;
 
 /*
- * Represents the heap insert method to be used during COPY FROM.
+ * Represents the insert method to be used during COPY FROM.
  */
 typedef enum CopyInsertMethod
 {
-   CIM_SINGLE,                 /* use table_tuple_insert or fdw routine */
-   CIM_MULTI,                  /* always use table_multi_insert */
-   CIM_MULTI_CONDITIONAL       /* use table_multi_insert only if valid */
+   CIM_SINGLE,                 /* use table_tuple_insert or
+                                * ExecForeignInsert */
+   CIM_MULTI,                  /* always use table_multi_insert or
+                                * ExecForeignBatchInsert */
+   CIM_MULTI_CONDITIONAL       /* use table_multi_insert or
+                                * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
 /*
@@ -81,6 +84,7 @@ typedef struct CopyFromStateData
    uint64      cur_lineno;     /* line number for error messages */
    const char *cur_attname;    /* current att for error messages */
    const char *cur_attval;     /* current att value for error messages */
+   bool        relname_only;   /* don't output line number, att, etc. */
 
    /*
     * Working state