{
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
- BulkInsertState bistate; /* BulkInsertState for this rel */
+ BulkInsertState bistate; /* BulkInsertState for this rel if plain
+ * table; NULL if foreign table */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
{
CopyFromState cstate = (CopyFromState) arg;
+ if (cstate->relname_only)
+ {
+ errcontext("COPY %s",
+ cstate->cur_relname);
+ return;
+ }
if (cstate->opts.binary)
{
/* can't usefully display the data */
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
- buffer->bistate = GetBulkInsertState();
+ buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
*/
static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
- CopyMultiInsertBuffer *buffer)
+ CopyMultiInsertBuffer *buffer,
+ int64 *processed)
{
- MemoryContext oldcontext;
- int i;
- uint64 save_cur_lineno;
CopyFromState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
- CommandId mycid = miinfo->mycid;
- int ti_options = miinfo->ti_options;
- bool line_buf_valid = cstate->line_buf_valid;
int nused = buffer->nused;
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
TupleTableSlot **slots = buffer->slots;
+ int i;
- /*
- * Print error context information correctly, if one of the operations
- * below fails.
- */
- cstate->line_buf_valid = false;
- save_cur_lineno = cstate->cur_lineno;
+ if (resultRelInfo->ri_FdwRoutine)
+ {
+ int batch_size = resultRelInfo->ri_BatchSize;
+ int sent = 0;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
+ Assert(buffer->bistate == NULL);
+
+ /* Ensure that the FDW supports batching and it's enabled */
+ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
+ Assert(batch_size > 1);
- for (i = 0; i < nused; i++)
- {
/*
- * If there are any indexes, update them for all the inserted tuples,
- * and run AFTER ROW INSERT triggers.
+ * We suppress error context information other than the relation name,
+ * if one of the operations below fails.
*/
- if (resultRelInfo->ri_NumIndices > 0)
+ Assert(!cstate->relname_only);
+ cstate->relname_only = true;
+
+ while (sent < nused)
{
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(resultRelInfo,
- buffer->slots[i], estate, false, false,
- NULL, NIL);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
+ int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
+ int inserted = size;
+ TupleTableSlot **rslots;
+
+ /* insert into foreign table: let the FDW do it */
+ rslots =
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+ resultRelInfo,
+ &slots[sent],
+ NULL,
+ &inserted);
+
+ sent += size;
+
+ /* No need to do anything if there are no inserted rows */
+ if (inserted <= 0)
+ continue;
+
+ /* Triggers on foreign tables should not have transition tables */
+ Assert(resultRelInfo->ri_TrigDesc == NULL ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
+
+ /* Run AFTER ROW INSERT triggers */
+ if (resultRelInfo->ri_TrigDesc != NULL &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ {
+ Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ for (i = 0; i < inserted; i++)
+ {
+ TupleTableSlot *slot = rslots[i];
+
+ /*
+ * AFTER ROW Triggers might reference the tableoid column,
+ * so (re-)initialize tts_tableOid before evaluating them.
+ */
+ slot->tts_tableOid = relid;
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ cstate->transition_capture);
+ }
+ }
+
+ /* Update the row counter and progress of the COPY command */
+ *processed += inserted;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *processed);
}
+ for (i = 0; i < nused; i++)
+ ExecClearTuple(slots[i]);
+
+ /* reset relname_only */
+ cstate->relname_only = false;
+ }
+ else
+ {
+ CommandId mycid = miinfo->mycid;
+ int ti_options = miinfo->ti_options;
+ bool line_buf_valid = cstate->line_buf_valid;
+ uint64 save_cur_lineno = cstate->cur_lineno;
+ MemoryContext oldcontext;
+
+ Assert(buffer->bistate != NULL);
+
/*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
+ * Print error context information correctly, if one of the operations
+ * below fails.
*/
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ cstate->line_buf_valid = false;
+
+ /*
+ * table_multi_insert may leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ table_multi_insert(resultRelInfo->ri_RelationDesc,
+ slots,
+ nused,
+ mycid,
+ ti_options,
+ buffer->bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < nused; i++)
{
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL, cstate->transition_capture);
+ /*
+ * If there are any indexes, update them for all the inserted
+ * tuples, and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[i];
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ buffer->slots[i], estate, false,
+ false, NULL, NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], recheckIndexes,
+ cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT
+ * triggers anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[i];
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], NIL,
+ cstate->transition_capture);
+ }
+
+ ExecClearTuple(slots[i]);
}
- ExecClearTuple(slots[i]);
+ /* Update the row counter and progress of the COPY command */
+ *processed += nused;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *processed);
+
+ /* reset cur_lineno and line_buf_valid to what they were */
+ cstate->line_buf_valid = line_buf_valid;
+ cstate->cur_lineno = save_cur_lineno;
}
/* Mark that all slots are free */
buffer->nused = 0;
-
- /* reset cur_lineno and line_buf_valid to what they were */
- cstate->line_buf_valid = line_buf_valid;
- cstate->cur_lineno = save_cur_lineno;
}
/*
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
+ ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
int i;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
/* Remove back-link to ourself */
- buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+ resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
- FreeBulkInsertState(buffer->bistate);
+ if (resultRelInfo->ri_FdwRoutine == NULL)
+ {
+ Assert(buffer->bistate != NULL);
+ FreeBulkInsertState(buffer->bistate);
+ }
+ else
+ Assert(buffer->bistate == NULL);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
- table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
+ if (resultRelInfo->ri_FdwRoutine == NULL)
+ table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+ miinfo->ti_options);
pfree(buffer);
}
* 'curr_rri'.
*/
static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
+ int64 *processed)
{
ListCell *lc;
{
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
- CopyMultiInsertBufferFlush(miinfo, buffer);
+ CopyMultiInsertBufferFlush(miinfo, buffer, processed);
}
miinfo->bufferedTuples = 0;
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
+ /*
+ * Also, if the named relation is a foreign table, determine if the FDW
+ * supports batch insert and determine the batch size (a FDW may support
+ * batching, but it may be disabled for the server/table).
+ *
+ * If the FDW does not support batching, we set the batch size to 1.
+ */
+ if (resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+ resultRelInfo->ri_BatchSize =
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+ else
+ resultRelInfo->ri_BatchSize = 1;
+
+ Assert(resultRelInfo->ri_BatchSize >= 1);
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/*
* It's generally more efficient to prepare a bunch of tuples for
- * insertion, and insert them in one table_multi_insert() call, than call
- * table_tuple_insert() separately for every tuple. However, there are a
- * number of reasons why we might not be able to do this. These are
- * explained below.
+ * insertion, and insert them in one
+ * table_multi_insert()/ExecForeignBatchInsert() call, than call
+ * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
+ * However, there are a number of reasons why we might not be able to do
+ * this. These are explained below.
*/
if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
*/
insertMethod = CIM_SINGLE;
}
+ else if (resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_BatchSize == 1)
+ {
+ /*
+ * Can't support multi-inserts to a foreign table if the FDW does not
+ * support batching, or it's disabled for the server or foreign table.
+ */
+ insertMethod = CIM_SINGLE;
+ }
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
{
*/
insertMethod = CIM_SINGLE;
}
- else if (resultRelInfo->ri_FdwRoutine != NULL ||
- cstate->volatile_defexprs)
+ else if (cstate->volatile_defexprs)
{
/*
- * Can't support multi-inserts to foreign tables or if there are any
- * volatile default expressions in the table. Similarly to the
- * trigger case above, such expressions may query the table we're
- * inserting into.
+ * Can't support multi-inserts if there are any volatile default
+ * expressions in the table. Similarly to the trigger case above,
+ * such expressions may query the table we're inserting into.
*
* Note: It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
* For partitioned tables, we may still be able to perform bulk
* inserts. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts
- * if the partition is a foreign table or it has any before row insert
- * or insert instead triggers (same as we checked above for the parent
- * table). Since the partition's resultRelInfos are initialized only
- * when we actually need to insert the first tuple into them, we must
- * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
- * flag that we must later determine if we can use bulk-inserts for
- * the partition being inserted into.
+ * if the partition is a foreign table that can't use batching or it
+ * has any before row insert or insert instead triggers (same as we
+ * checked above for the parent table). Since the partition's
+ * resultRelInfos are initialized only when we actually need to insert
+ * the first tuple into them, we must have the intermediate insert
+ * method of CIM_MULTI_CONDITIONAL to flag that we must later
+ * determine if we can use bulk-inserts for the partition being
+ * inserted into.
*/
if (proute)
insertMethod = CIM_MULTI_CONDITIONAL;
/*
* Disable multi-inserts when the partition has BEFORE/INSTEAD
- * OF triggers, or if the partition is a foreign partition.
+ * OF triggers, or if the partition is a foreign table that
+ * can't use batching.
*/
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
- resultRelInfo->ri_FdwRoutine == NULL;
+ (resultRelInfo->ri_FdwRoutine == NULL ||
+ resultRelInfo->ri_BatchSize > 1);
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
* Flush pending inserts if this partition can't use
* batching, so rows are visible to triggers etc.
*/
- CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+ CopyMultiInsertInfoFlush(&multiInsertInfo,
+ resultRelInfo,
+ &processed);
}
if (bistate != NULL)
* buffers out to their tables.
*/
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
- CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+ CopyMultiInsertInfoFlush(&multiInsertInfo,
+ resultRelInfo,
+ &processed);
+
+ /*
+ * We delay updating the row counter and progress of the
+ * COPY command until after writing the tuples stored in
+ * the buffer out to the table, as in single insert mode.
+ * See CopyMultiInsertBufferFlush().
+ */
+ continue; /* next tuple please */
}
else
{
if (insertMethod != CIM_SINGLE)
{
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
- CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+ CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
}
/* Done, clean up */
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
+ cstate->relname_only = false;
/*
* Allocate buffers for the input pipeline.