Report progress of COPY commands
authorTomas Vondra <tomas.vondra@postgresql.org>
Wed, 6 Jan 2021 20:46:26 +0000 (21:46 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Wed, 6 Jan 2021 20:51:06 +0000 (21:51 +0100)
This commit introduces a view pg_stat_progress_copy, reporting progress
of COPY commands.  This allows rough estimates how far a running COPY
progressed, with the caveat that the total number of bytes may not be
available in some cases (e.g. when the input comes from the client).

Author: Josef Šimánek
Reviewed-by: Fujii Masao, Bharath Rupireddy, Vignesh C, Matthias van de Meent
Discussion: https://postgr.es/m/CAFp7QwqMGEi4OyyaLEK9DR0+E+oK3UtA4bEjDVCa4bNkwUY2PQ@mail.gmail.com
Discussion: https://postgr.es/m/CAFp7Qwr6_FmRM6pCO0x_a0mymOfX_Gg+FEKet4XaTGSW=LitKQ@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/commands/copyfrom.c
src/backend/commands/copyfromparse.c
src/backend/commands/copyto.c
src/backend/utils/adt/pgstatfuncs.c
src/include/commands/copyfrom_internal.h
src/include/commands/progress.h
src/include/pgstat.h
src/test/regress/expected/rules.out

index 3d6c901306777caa56b6dfa6276d501f09b3d763..43fe8ae383eb923bdc6a7c31ece1178ec625cf3a 100644 (file)
@@ -399,6 +399,12 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
+      <entry>One row for each backend running <command>COPY</command>, showing current progress.
+       See <xref linkend='copy-progress-reporting'/>.
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
    which support progress reporting are <command>ANALYZE</command>,
    <command>CLUSTER</command>,
    <command>CREATE INDEX</command>, <command>VACUUM</command>,
+   <command>COPY</command>,
    and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
    command that <xref linkend="app-pgbasebackup"/> issues to take
    a base backup).
@@ -6396,6 +6403,106 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
   </table>
 
  </sect2>
+
+ <sect2 id="copy-progress-reporting">
+  <title>COPY Progress Reporting</title>
+
+  <indexterm>
+   <primary>pg_stat_progress_copy</primary>
+  </indexterm>
+
+  <para>
+   Whenever <command>COPY</command> is running, the
+   <structname>pg_stat_progress_copy</structname> view will contain one row
+   for each backend that is currently running <command>COPY</command> command.
+   The table bellow describes the information that will be reported and provide
+   information how to interpret it.
+  </para>
+
+  <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
+   <title><structname>pg_stat_progress_copy</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pid</structfield> <type>integer</type>
+      </para>
+      <para>
+       Process ID of backend.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datid</structfield> <type>text</type>
+      </para>
+      <para>
+       OID of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the table on which the <command>COPY</command> command is executed.
+       It is set to 0 if <command>SELECT</command> query is provided.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of bytes already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Size of source file for <command>COPY FROM</command> command in bytes.
+       It is set to 0 if not available.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>lines_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of lines already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect2>
+
  </sect1>
 
  <sect1 id="dynamic-trace">
index ab4603c69b8e84141ab4dbff933890be7dc2e5a3..5d89e77dbe2f56982dc382bfc20f397130e4c7a8 100644 (file)
@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
         S.param5 AS tablespaces_streamed
     FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
 
+
+CREATE VIEW pg_stat_progress_copy AS
+    SELECT
+        S.pid AS pid, S.datid AS datid, D.datname AS datname,
+        S.relid AS relid,
+        S.param1 AS bytes_processed,
+        S.param2 AS bytes_total,
+        S.param3 AS lines_processed
+    FROM pg_stat_get_progress_info('COPY') AS S
+        LEFT JOIN pg_database D ON S.datid = D.oid;
+
 CREATE VIEW pg_user_mappings AS
     SELECT
         U.oid       AS umid,
index 84a5045215b076034d384989a4b6fc3e602dd69d..08b6f782c735879f4f93e3c0a2945316eb7eab55 100644 (file)
@@ -25,6 +25,7 @@
 #include "access/xlog.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
@@ -35,6 +36,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
            /*
             * We count only tuples not suppressed by a BEFORE INSERT trigger
             * or FDW; this is the same definition used by nodeModifyTable.c
-            * for counting tuples inserted by an INSERT command.
+            * for counting tuples inserted by an INSERT command. Update
+            * progress of the COPY command as well.
             */
-           processed++;
+           pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
        }
    }
 
@@ -1415,6 +1418,12 @@ BeginCopyFrom(ParseState *pstate,
        }
    }
 
+
+   /* initialize progress */
+   pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+                                 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+   cstate->bytes_processed = 0;
+
    /* We keep those variables in cstate. */
    cstate->in_functions = in_functions;
    cstate->typioparams = typioparams;
@@ -1479,6 +1488,8 @@ BeginCopyFrom(ParseState *pstate,
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                         errmsg("\"%s\" is a directory", cstate->filename)));
+
+           pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
        }
    }
 
@@ -1522,6 +1533,8 @@ EndCopyFrom(CopyFromState cstate)
                            cstate->filename)));
    }
 
+   pgstat_progress_end_command();
+
    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
 }
index 4360b7788ea0bd9d140cc4272aa2d67f25b66789..4c74067f849ccda40470b0ea752f351ff7b6212e 100644 (file)
 
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
    cstate->raw_buf[nbytes] = '\0';
    cstate->raw_buf_index = 0;
    cstate->raw_buf_len = nbytes;
+   cstate->bytes_processed += nbytes;
+   pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
    return (inbytes > 0);
 }
 
index 51597ae523d1a746b4d527592d986c83196dc6d0..e04ec1e331b4b1ab4c698275e832fa83db530434 100644 (file)
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
 #include "executor/tuptable.h"
@@ -32,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -95,6 +97,7 @@ typedef struct CopyToStateData
 
    FmgrInfo   *out_functions;  /* lookup info for output functions */
    MemoryContext rowcontext;   /* per-row evaluation context */
+   uint64      bytes_processed;    /* number of bytes processed so far */
 
 } CopyToStateData;
 
@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
            break;
    }
 
+   /* Update the progress */
+   cstate->bytes_processed += fe_msgbuf->len;
+   pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
    resetStringInfo(fe_msgbuf);
 }
 
@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
                            cstate->filename)));
    }
 
+   pgstat_progress_end_command();
+
    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
 }
@@ -760,6 +769,11 @@ BeginCopyTo(ParseState *pstate,
        }
    }
 
+   /* initialize progress */
+   pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+                                 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+   cstate->bytes_processed = 0;
+
    MemoryContextSwitchTo(oldcontext);
 
    return cstate;
@@ -938,7 +952,9 @@ CopyTo(CopyToState cstate)
 
            /* Format and send the data */
            CopyOneRowTo(cstate, slot);
-           processed++;
+
+           /* Increment amount of processed tuples and update the progress */
+           pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
        }
 
        ExecDropSingleTupleTableSlot(slot);
@@ -1303,7 +1319,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
    /* Send the data */
    CopyOneRowTo(cstate, slot);
-   myState->processed++;
+
+   /* Increment amount of processed tuples and update the progress */
+   pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
 
    return true;
 }
index c9a1d4c56d9780c57ea55de7027a1fd9a44b5628..5c12a165a1535e8d90e12d8f2d31670054b5350c 100644 (file)
@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
        cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
    else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
        cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+   else if (pg_strcasecmp(cmd, "COPY") == 0)
+       cmdtype = PROGRESS_COMMAND_COPY;
    else
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
index 5401a966d270777b868369781d34ea3f7de6723b..e37942df391e7bd24c655a21193b4a96693f294f 100644 (file)
@@ -154,6 +154,7 @@ typedef struct CopyFromStateData
    char       *raw_buf;
    int         raw_buf_index;  /* next byte to process */
    int         raw_buf_len;    /* total # of bytes stored */
+   uint64      bytes_processed;/* number of bytes processed so far */
    /* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 } CopyFromStateData;
index 49a158aabbf92dcc60193682b5942e70b8611998..95ec5d02e9cc47df7295dc6117db350604b8eeec 100644 (file)
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE     4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL         5
 
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
 #endif
index 3a7e1997506e28abff317decf56e8aec4a7509b1..c38b68971019c2969a783461f7c79cf3f1b757e5 100644 (file)
@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
    PROGRESS_COMMAND_ANALYZE,
    PROGRESS_COMMAND_CLUSTER,
    PROGRESS_COMMAND_CREATE_INDEX,
-   PROGRESS_COMMAND_BASEBACKUP
+   PROGRESS_COMMAND_BASEBACKUP,
+   PROGRESS_COMMAND_COPY
 } ProgressCommandType;
 
 #define PGSTAT_NUM_PROGRESS_PARAM  20
index 6293ab57bcf61fe3bad1d1a89bbd605d0e77925b..a687e99d1e4fe18d0c0cc4c71351b432b29080d3 100644 (file)
@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
     s.param8 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+    s.datid,
+    d.datname,
+    s.relid,
+    s.param1 AS bytes_processed,
+    s.param2 AS bytes_total,
+    s.param3 AS lines_processed
+   FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+     LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
     s.datid,
     d.datname,