Re-implement psql's FETCH_COUNT feature atop libpq's chunked mode.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Apr 2024 00:45:05 +0000 (20:45 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Apr 2024 00:45:11 +0000 (20:45 -0400)
Formerly this was done with a cursor, which is problematic since
not all result-set-returning query types can be put into a cursor.
The new implementation is better integrated into other psql
features, too.

Daniel Vérité, reviewed by Laurenz Albe and myself (and whacked
around a bit by me, so any remaining bugs are my fault)

Discussion: https://postgr.es/m/CAKZiRmxsVTkO928CM+-ADvsMyePmU3L9DQCa9NwqjvLPcEe5QA@mail.gmail.com

src/bin/psql/common.c
src/bin/psql/t/001_basic.pl
src/test/regress/expected/psql.out
src/test/regress/sql/psql.sql

index 2830bde49514428369797bdd4f0795ea40888061..3dc6dc45f9cd0b91f7c6a83d12fad133680d883a 100644 (file)
@@ -31,7 +31,6 @@
 #include "settings.h"
 
 static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
 static int ExecQueryAndProcessResults(const char *query,
                                       double *elapsed_msec,
                                       bool *svpt_gone_p,
@@ -40,7 +39,6 @@ static int    ExecQueryAndProcessResults(const char *query,
                                       const printQueryOpt *opt,
                                       FILE *printQueryFout);
 static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
 
 
 /*
@@ -83,6 +81,46 @@ openQueryOutputFile(const char *fname, FILE **fout, bool *is_pipe)
    return true;
 }
 
+/*
+ * Check if an output stream for \g needs to be opened, and if yes,
+ * open it and update the caller's gfile_fout and is_pipe state variables.
+ * Return true if OK, false if an error occurred.
+ */
+static bool
+SetupGOutput(FILE **gfile_fout, bool *is_pipe)
+{
+   /* If there is a \g file or program, and it's not already open, open it */
+   if (pset.gfname != NULL && *gfile_fout == NULL)
+   {
+       if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+       {
+           if (*is_pipe)
+               disable_sigpipe_trap();
+       }
+       else
+           return false;
+   }
+   return true;
+}
+
+/*
+ * Close the output stream for \g, if we opened it.
+ */
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+   if (gfile_fout)
+   {
+       if (is_pipe)
+       {
+           SetShellResultVariables(pclose(gfile_fout));
+           restore_sigpipe_trap();
+       }
+       else
+           fclose(gfile_fout);
+   }
+}
+
 /*
  * setQFout
  * -- handler for -o command line option and \o command
@@ -373,6 +411,7 @@ AcceptResult(const PGresult *result, bool show_error)
        {
            case PGRES_COMMAND_OK:
            case PGRES_TUPLES_OK:
+           case PGRES_TUPLES_CHUNK:
            case PGRES_EMPTY_QUERY:
            case PGRES_COPY_IN:
            case PGRES_COPY_OUT:
@@ -1135,16 +1174,10 @@ SendQuery(const char *query)
        /* Describe query's result columns, without executing it */
        OK = DescribeQuery(query, &elapsed_msec);
    }
-   else if (pset.fetch_count <= 0 || pset.gexec_flag ||
-            pset.crosstab_flag || !is_select_command(query))
-   {
-       /* Default fetch-it-all-and-print mode */
-       OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
-   }
    else
    {
-       /* Fetch-in-segments mode */
-       OK = ExecQueryUsingCursor(query, &elapsed_msec);
+       /* Default fetch-and-print mode */
+       OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
    }
 
    if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@@ -1454,6 +1487,21 @@ ExecQueryAndProcessResults(const char *query,
        return -1;
    }
 
+   /*
+    * Fetch the result in chunks if FETCH_COUNT is set.  But we don't enable
+    * chunking if SHOW_ALL_RESULTS is false, since that requires us to
+    * accumulate all rows before we can tell what should be displayed, which
+    * would counter the idea of FETCH_COUNT.  Chunking is also disabled when
+    * \crosstab, \gexec, \gset or \watch is used.
+    */
+   if (pset.fetch_count > 0 && pset.show_all_results &&
+       !pset.crosstab_flag && !pset.gexec_flag &&
+       !pset.gset_prefix && !is_watch)
+   {
+       if (!PQsetChunkedRowsMode(pset.db, pset.fetch_count))
+           pg_log_warning("fetching results in chunked mode failed");
+   }
+
    /*
     * If SIGINT is sent while the query is processing, the interrupt will be
     * consumed.  The user's intention, though, is to cancel the entire watch
@@ -1475,6 +1523,7 @@ ExecQueryAndProcessResults(const char *query,
    while (result != NULL)
    {
        ExecStatusType result_status;
+       bool        is_chunked_result = false;
        PGresult   *next_result;
        bool        last;
 
@@ -1572,20 +1621,9 @@ ExecQueryAndProcessResults(const char *query,
                }
                else if (pset.gfname)
                {
-                   /* send to \g file, which we may have opened already */
-                   if (gfile_fout == NULL)
-                   {
-                       if (openQueryOutputFile(pset.gfname,
-                                               &gfile_fout, &gfile_is_pipe))
-                       {
-                           if (gfile_is_pipe)
-                               disable_sigpipe_trap();
-                           copy_stream = gfile_fout;
-                       }
-                       else
-                           success = false;
-                   }
-                   else
+                   /* COPY followed by \g filename or \g |program */
+                   success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+                   if (gfile_fout)
                        copy_stream = gfile_fout;
                }
                else
@@ -1603,6 +1641,101 @@ ExecQueryAndProcessResults(const char *query,
            success &= HandleCopyResult(&result, copy_stream);
        }
 
+       /* If we have a chunked result, collect and print all chunks */
+       if (result_status == PGRES_TUPLES_CHUNK)
+       {
+           FILE       *tuples_fout = printQueryFout ? printQueryFout : stdout;
+           printQueryOpt my_popt = pset.popt;
+           int64       total_tuples = 0;
+           bool        is_pager = false;
+           int         flush_error = 0;
+
+           /* initialize print options for partial table output */
+           my_popt.topt.start_table = true;
+           my_popt.topt.stop_table = false;
+           my_popt.topt.prior_records = 0;
+
+           /* open \g file if needed */
+           success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+           if (gfile_fout)
+               tuples_fout = gfile_fout;
+
+           /* force use of pager for any chunked resultset going to stdout */
+           if (success && tuples_fout == stdout)
+           {
+               tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+               is_pager = true;
+           }
+
+           do
+           {
+               /*
+                * display the current chunk of results, unless the output
+                * stream stopped working or we got cancelled
+                */
+               if (success && !flush_error && !cancel_pressed)
+               {
+                   printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+                   flush_error = fflush(tuples_fout);
+               }
+
+               /* after the first result set, disallow header decoration */
+               my_popt.topt.start_table = false;
+
+               /* count tuples before dropping the result */
+               my_popt.topt.prior_records += PQntuples(result);
+               total_tuples += PQntuples(result);
+
+               ClearOrSaveResult(result);
+
+               /* get the next result, loop if it's PGRES_TUPLES_CHUNK */
+               result = PQgetResult(pset.db);
+           } while (PQresultStatus(result) == PGRES_TUPLES_CHUNK);
+
+           /* We expect an empty PGRES_TUPLES_OK, else there's a problem */
+           if (PQresultStatus(result) == PGRES_TUPLES_OK)
+           {
+               char        buf[32];
+
+               Assert(PQntuples(result) == 0);
+
+               /* Display the footer using the empty result */
+               if (success && !flush_error && !cancel_pressed)
+               {
+                   my_popt.topt.stop_table = true;
+                   printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+                   fflush(tuples_fout);
+               }
+
+               if (is_pager)
+                   ClosePager(tuples_fout);
+
+               /*
+                * We must do a fake SetResultVariables(), since we don't have
+                * a PGresult corresponding to the whole query.
+                */
+               SetVariable(pset.vars, "ERROR", "false");
+               SetVariable(pset.vars, "SQLSTATE", "00000");
+               snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+               SetVariable(pset.vars, "ROW_COUNT", buf);
+               /* Prevent SetResultVariables call below */
+               is_chunked_result = true;
+
+               /* Clear the empty result so it isn't printed below */
+               ClearOrSaveResult(result);
+               result = NULL;
+           }
+           else
+           {
+               /* Probably an error report, so close the pager and print it */
+               if (is_pager)
+                   ClosePager(tuples_fout);
+
+               success &= AcceptResult(result, true);
+               /* SetResultVariables and ClearOrSaveResult happen below */
+           }
+       }
+
        /*
         * Check PQgetResult() again.  In the typical case of a single-command
         * string, it will return NULL.  Otherwise, we'll have other results
@@ -1640,31 +1773,18 @@ ExecQueryAndProcessResults(const char *query,
             * tuple output, but it's still used for status output.
             */
            FILE       *tuples_fout = printQueryFout;
-           bool        do_print = true;
 
-           if (PQresultStatus(result) == PGRES_TUPLES_OK &&
-               pset.gfname)
-           {
-               if (gfile_fout == NULL)
-               {
-                   if (openQueryOutputFile(pset.gfname,
-                                           &gfile_fout, &gfile_is_pipe))
-                   {
-                       if (gfile_is_pipe)
-                           disable_sigpipe_trap();
-                   }
-                   else
-                       success = do_print = false;
-               }
+           if (PQresultStatus(result) == PGRES_TUPLES_OK)
+               success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+           if (gfile_fout)
                tuples_fout = gfile_fout;
-           }
-           if (do_print)
+           if (success)
                success &= PrintQueryResult(result, last, opt,
                                            tuples_fout, printQueryFout);
        }
 
-       /* set variables from last result */
-       if (!is_watch && last)
+       /* set variables from last result, unless dealt with elsewhere */
+       if (last && !is_watch && !is_chunked_result)
            SetResultVariables(result, success);
 
        ClearOrSaveResult(result);
@@ -1678,16 +1798,7 @@ ExecQueryAndProcessResults(const char *query,
    }
 
    /* close \g file if we opened it */
-   if (gfile_fout)
-   {
-       if (gfile_is_pipe)
-       {
-           SetShellResultVariables(pclose(gfile_fout));
-           restore_sigpipe_trap();
-       }
-       else
-           fclose(gfile_fout);
-   }
+   CloseGOutput(gfile_fout, gfile_is_pipe);
 
    /* may need this to recover from conn loss during COPY */
    if (!CheckConnection())
@@ -1700,274 +1811,6 @@ ExecQueryAndProcessResults(const char *query,
 }
 
 
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
-   bool        OK = true;
-   PGresult   *result;
-   PQExpBufferData buf;
-   printQueryOpt my_popt = pset.popt;
-   bool        timing = pset.timing;
-   FILE       *fout;
-   bool        is_pipe;
-   bool        is_pager = false;
-   bool        started_txn = false;
-   int64       total_tuples = 0;
-   int         ntuples;
-   int         fetch_count;
-   char        fetch_cmd[64];
-   instr_time  before,
-               after;
-   int         flush_error;
-
-   *elapsed_msec = 0;
-
-   /* initialize print options for partial table output */
-   my_popt.topt.start_table = true;
-   my_popt.topt.stop_table = false;
-   my_popt.topt.prior_records = 0;
-
-   if (timing)
-       INSTR_TIME_SET_CURRENT(before);
-   else
-       INSTR_TIME_SET_ZERO(before);
-
-   /* if we're not in a transaction, start one */
-   if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
-   {
-       result = PQexec(pset.db, "BEGIN");
-       OK = AcceptResult(result, true) &&
-           (PQresultStatus(result) == PGRES_COMMAND_OK);
-       ClearOrSaveResult(result);
-       if (!OK)
-           return false;
-       started_txn = true;
-   }
-
-   /* Send DECLARE CURSOR */
-   initPQExpBuffer(&buf);
-   appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
-                     query);
-
-   result = PQexec(pset.db, buf.data);
-   OK = AcceptResult(result, true) &&
-       (PQresultStatus(result) == PGRES_COMMAND_OK);
-   if (!OK)
-       SetResultVariables(result, OK);
-   ClearOrSaveResult(result);
-   termPQExpBuffer(&buf);
-   if (!OK)
-       goto cleanup;
-
-   if (timing)
-   {
-       INSTR_TIME_SET_CURRENT(after);
-       INSTR_TIME_SUBTRACT(after, before);
-       *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
-   }
-
-   /*
-    * In \gset mode, we force the fetch count to be 2, so that we will throw
-    * the appropriate error if the query returns more than one row.
-    */
-   if (pset.gset_prefix)
-       fetch_count = 2;
-   else
-       fetch_count = pset.fetch_count;
-
-   snprintf(fetch_cmd, sizeof(fetch_cmd),
-            "FETCH FORWARD %d FROM _psql_cursor",
-            fetch_count);
-
-   /* prepare to write output to \g argument, if any */
-   if (pset.gfname)
-   {
-       if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
-       {
-           OK = false;
-           goto cleanup;
-       }
-       if (is_pipe)
-           disable_sigpipe_trap();
-   }
-   else
-   {
-       fout = pset.queryFout;
-       is_pipe = false;        /* doesn't matter */
-   }
-
-   /* clear any pre-existing error indication on the output stream */
-   clearerr(fout);
-
-   for (;;)
-   {
-       if (timing)
-           INSTR_TIME_SET_CURRENT(before);
-
-       /* get fetch_count tuples at a time */
-       result = PQexec(pset.db, fetch_cmd);
-
-       if (timing)
-       {
-           INSTR_TIME_SET_CURRENT(after);
-           INSTR_TIME_SUBTRACT(after, before);
-           *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
-       }
-
-       if (PQresultStatus(result) != PGRES_TUPLES_OK)
-       {
-           /* shut down pager before printing error message */
-           if (is_pager)
-           {
-               ClosePager(fout);
-               is_pager = false;
-           }
-
-           OK = AcceptResult(result, true);
-           Assert(!OK);
-           SetResultVariables(result, OK);
-           ClearOrSaveResult(result);
-           break;
-       }
-
-       if (pset.gset_prefix)
-       {
-           /* StoreQueryTuple will complain if not exactly one row */
-           OK = StoreQueryTuple(result);
-           ClearOrSaveResult(result);
-           break;
-       }
-
-       /*
-        * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
-        */
-
-       ntuples = PQntuples(result);
-       total_tuples += ntuples;
-
-       if (ntuples < fetch_count)
-       {
-           /* this is the last result set, so allow footer decoration */
-           my_popt.topt.stop_table = true;
-       }
-       else if (fout == stdout && !is_pager)
-       {
-           /*
-            * If query requires multiple result sets, hack to ensure that
-            * only one pager instance is used for the whole mess
-            */
-           fout = PageOutput(INT_MAX, &(my_popt.topt));
-           is_pager = true;
-       }
-
-       printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
-       ClearOrSaveResult(result);
-
-       /* after the first result set, disallow header decoration */
-       my_popt.topt.start_table = false;
-       my_popt.topt.prior_records += ntuples;
-
-       /*
-        * Make sure to flush the output stream, so intermediate results are
-        * visible to the client immediately.  We check the results because if
-        * the pager dies/exits/etc, there's no sense throwing more data at
-        * it.
-        */
-       flush_error = fflush(fout);
-
-       /*
-        * Check if we are at the end, if a cancel was pressed, or if there
-        * were any errors either trying to flush out the results, or more
-        * generally on the output stream at all.  If we hit any errors
-        * writing things to the stream, we presume $PAGER has disappeared and
-        * stop bothering to pull down more data.
-        */
-       if (ntuples < fetch_count || cancel_pressed || flush_error ||
-           ferror(fout))
-           break;
-   }
-
-   if (pset.gfname)
-   {
-       /* close \g argument file/pipe */
-       if (is_pipe)
-       {
-           SetShellResultVariables(pclose(fout));
-           restore_sigpipe_trap();
-       }
-       else
-           fclose(fout);
-   }
-   else if (is_pager)
-   {
-       /* close transient pager */
-       ClosePager(fout);
-   }
-
-   if (OK)
-   {
-       /*
-        * We don't have a PGresult here, and even if we did it wouldn't have
-        * the right row count, so fake SetResultVariables().  In error cases,
-        * we already set the result variables above.
-        */
-       char        buf[32];
-
-       SetVariable(pset.vars, "ERROR", "false");
-       SetVariable(pset.vars, "SQLSTATE", "00000");
-       snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
-       SetVariable(pset.vars, "ROW_COUNT", buf);
-   }
-
-cleanup:
-   if (timing)
-       INSTR_TIME_SET_CURRENT(before);
-
-   /*
-    * We try to close the cursor on either success or failure, but on failure
-    * ignore the result (it's probably just a bleat about being in an aborted
-    * transaction)
-    */
-   result = PQexec(pset.db, "CLOSE _psql_cursor");
-   if (OK)
-   {
-       OK = AcceptResult(result, true) &&
-           (PQresultStatus(result) == PGRES_COMMAND_OK);
-       ClearOrSaveResult(result);
-   }
-   else
-       PQclear(result);
-
-   if (started_txn)
-   {
-       result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
-       OK &= AcceptResult(result, true) &&
-           (PQresultStatus(result) == PGRES_COMMAND_OK);
-       ClearOrSaveResult(result);
-   }
-
-   if (timing)
-   {
-       INSTR_TIME_SET_CURRENT(after);
-       INSTR_TIME_SUBTRACT(after, before);
-       *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
-   }
-
-   return OK;
-}
-
-
 /*
  * Advance the given char pointer over white space and SQL comments.
  */
@@ -2247,43 +2090,6 @@ command_no_begin(const char *query)
 }
 
 
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
-   int         wordlen;
-
-   /*
-    * First advance over any whitespace, comments and left parentheses.
-    */
-   for (;;)
-   {
-       query = skip_white_space(query);
-       if (query[0] == '(')
-           query++;
-       else
-           break;
-   }
-
-   /*
-    * Check word length (since "selectx" is not "select").
-    */
-   wordlen = 0;
-   while (isalpha((unsigned char) query[wordlen]))
-       wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
-   if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
-       return true;
-
-   if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
-       return true;
-
-   return false;
-}
-
-
 /*
  * Test if the current user is a database superuser.
  */
index 9f0b6cf8ca1faed27294ff53aa5a238ff04041d8..b5fedbc091c299fffb0db013dcf7573a8aff6943 100644 (file)
@@ -161,7 +161,7 @@ psql_like(
    '\errverbose with no previous error');
 
 # There are three main ways to run a query that might affect
-# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
+# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
 # and using \gdesc.  Test them all.
 
 like(
@@ -184,10 +184,10 @@ like(
            "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
            on_error_stop => 0))[2],
    qr/\A^psql:<stdin>:2: ERROR:  .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
 ^ *^.*$
 ^psql:<stdin>:3: error: ERROR:  [0-9A-Z]{5}: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
 ^ *^.*$
 ^LOCATION: .*$/m,
    '\errverbose after FETCH_COUNT query with error');
index 69060fe3c003af03a08cb230eded485cee38ae31..0b8dd7abf2df63745911399b22df089920655dac 100644 (file)
@@ -4755,7 +4755,7 @@ number of rows: 0
 last error message: syntax error at end of input
 \echo 'last error code:' :LAST_ERROR_SQLSTATE
 last error code: 42601
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
 \set FETCH_COUNT 10
 select unique2 from tenk1 order by unique2 limit 19;
  unique2 
@@ -4787,7 +4787,7 @@ error: false
 error code: 00000
 \echo 'number of rows:' :ROW_COUNT
 number of rows: 19
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
 select 1/(15-unique2) from tenk1 order by unique2 limit 19;
  ?column? 
 ----------
index 129f8533537511edb9da59b32805033f523ac90c..33076cad79efd548f103c38c0cc39401990a9943 100644 (file)
@@ -1161,14 +1161,14 @@ SELECT 4 AS \gdesc
 \echo 'last error message:' :LAST_ERROR_MESSAGE
 \echo 'last error code:' :LAST_ERROR_SQLSTATE
 
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
 \set FETCH_COUNT 10
 select unique2 from tenk1 order by unique2 limit 19;
 \echo 'error:' :ERROR
 \echo 'error code:' :SQLSTATE
 \echo 'number of rows:' :ROW_COUNT
 
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
 select 1/(15-unique2) from tenk1 order by unique2 limit 19;
 \echo 'error:' :ERROR
 \echo 'error code:' :SQLSTATE