</entry>
</row>
+ <row>
+ <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
+ <entry>One row for each backend running <command>COPY</command>, showing current progress.
+ See <xref linkend='copy-progress-reporting'/>.
+ </entry>
+ </row>
</tbody>
</tgroup>
</table>
which support progress reporting are <command>ANALYZE</command>,
<command>CLUSTER</command>,
<command>CREATE INDEX</command>, <command>VACUUM</command>,
+ <command>COPY</command>,
and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
command that <xref linkend="app-pgbasebackup"/> issues to take
a base backup).
</table>
</sect2>
+
+ <sect2 id="copy-progress-reporting">
+ <title>COPY Progress Reporting</title>
+
+ <indexterm>
+ <primary>pg_stat_progress_copy</primary>
+ </indexterm>
+
+ <para>
+ Whenever <command>COPY</command> is running, the
+ <structname>pg_stat_progress_copy</structname> view will contain one row
+ for each backend that is currently running <command>COPY</command> command.
+ The table bellow describes the information that will be reported and provide
+ information how to interpret it.
+ </para>
+
+ <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
+ <title><structname>pg_stat_progress_copy</structname> View</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>pid</structfield> <type>integer</type>
+ </para>
+ <para>
+ Process ID of backend.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>datid</structfield> <type>text</type>
+ </para>
+ <para>
+ OID of the database to which this backend is connected.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>datname</structfield> <type>name</type>
+ </para>
+ <para>
+ Name of the database to which this backend is connected.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>relid</structfield> <type>oid</type>
+ </para>
+ <para>
+ OID of the table on which the <command>COPY</command> command is executed.
+ It is set to 0 if <command>SELECT</command> query is provided.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>bytes_processed</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of bytes already processed by <command>COPY</command> command.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>bytes_total</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Size of source file for <command>COPY FROM</command> command in bytes.
+ It is set to 0 if not available.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>lines_processed</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of lines already processed by <command>COPY</command> command.
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect2>
+
</sect1>
<sect1 id="dynamic-trace">
S.param5 AS tablespaces_streamed
FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
+
+CREATE VIEW pg_stat_progress_copy AS
+ SELECT
+ S.pid AS pid, S.datid AS datid, D.datname AS datname,
+ S.relid AS relid,
+ S.param1 AS bytes_processed,
+ S.param2 AS bytes_total,
+ S.param3 AS lines_processed
+ FROM pg_stat_get_progress_info('COPY') AS S
+ LEFT JOIN pg_database D ON S.datid = D.oid;
+
CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
- * for counting tuples inserted by an INSERT command.
+ * for counting tuples inserted by an INSERT command. Update
+ * progress of the COPY command as well.
*/
- processed++;
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}
}
}
}
+
+ /* initialize progress */
+ pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+ cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ cstate->bytes_processed = 0;
+
/* We keep those variables in cstate. */
cstate->in_functions = in_functions;
cstate->typioparams = typioparams;
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
+
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
}
}
cstate->filename)));
}
+ pgstat_progress_end_command();
+
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "port/pg_bswap.h"
#include "utils/memutils.h"
#include "utils/rel.h"
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes;
+ cstate->bytes_processed += nbytes;
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
return (inbytes > 0);
}
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
+#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
+ uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData;
break;
}
+ /* Update the progress */
+ cstate->bytes_processed += fe_msgbuf->len;
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
resetStringInfo(fe_msgbuf);
}
cstate->filename)));
}
+ pgstat_progress_end_command();
+
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
}
}
+ /* initialize progress */
+ pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+ cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ cstate->bytes_processed = 0;
+
MemoryContextSwitchTo(oldcontext);
return cstate;
/* Format and send the data */
CopyOneRowTo(cstate, slot);
- processed++;
+
+ /* Increment amount of processed tuples and update the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
}
ExecDropSingleTupleTableSlot(slot);
/* Send the data */
CopyOneRowTo(cstate, slot);
- myState->processed++;
+
+ /* Increment amount of processed tuples and update the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
return true;
}
cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+ else if (pg_strcasecmp(cmd, "COPY") == 0)
+ cmdtype = PROGRESS_COMMAND_COPY;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
+ uint64 bytes_processed;/* number of bytes processed so far */
/* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
} CopyFromStateData;
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
#endif
PROGRESS_COMMAND_ANALYZE,
PROGRESS_COMMAND_CLUSTER,
PROGRESS_COMMAND_CREATE_INDEX,
- PROGRESS_COMMAND_BASEBACKUP
+ PROGRESS_COMMAND_BASEBACKUP,
+ PROGRESS_COMMAND_COPY
} ProgressCommandType;
#define PGSTAT_NUM_PROGRESS_PARAM 20
s.param8 AS index_rebuild_count
FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+ s.datid,
+ d.datname,
+ s.relid,
+ s.param1 AS bytes_processed,
+ s.param2 AS bytes_total,
+ s.param3 AS lines_processed
+ FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+ LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,
s.datid,
d.datname,