#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
/*
return true;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if yes,
+ * open it and update the caller's gfile_fout and is_pipe state variables.
+ * Return true if OK, false if an error occurred.
+ */
+static bool
+SetupGOutput(FILE **gfile_fout, bool *is_pipe)
+{
+ /* If there is a \g file or program, and it's not already open, open it */
+ if (pset.gfname != NULL && *gfile_fout == NULL)
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (*is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+/*
+ * Close the output stream for \g, if we opened it.
+ */
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
+
/*
* setQFout
* -- handler for -o command line option and \o command
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
- else if (pset.fetch_count <= 0 || pset.gexec_flag ||
- pset.crosstab_flag || !is_select_command(query))
- {
- /* Default fetch-it-all-and-print mode */
- OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
- }
else
{
- /* Fetch-in-segments mode */
- OK = ExecQueryUsingCursor(query, &elapsed_msec);
+ /* Default fetch-and-print mode */
+ OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
return -1;
}
+ /*
+ * Fetch the result in chunks if FETCH_COUNT is set. But we don't enable
+ * chunking if SHOW_ALL_RESULTS is false, since that requires us to
+ * accumulate all rows before we can tell what should be displayed, which
+ * would counter the idea of FETCH_COUNT. Chunking is also disabled when
+ * \crosstab, \gexec, \gset or \watch is used.
+ */
+ if (pset.fetch_count > 0 && pset.show_all_results &&
+ !pset.crosstab_flag && !pset.gexec_flag &&
+ !pset.gset_prefix && !is_watch)
+ {
+ if (!PQsetChunkedRowsMode(pset.db, pset.fetch_count))
+ pg_log_warning("fetching results in chunked mode failed");
+ }
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
while (result != NULL)
{
ExecStatusType result_status;
+ bool is_chunked_result = false;
PGresult *next_result;
bool last;
}
else if (pset.gfname)
{
- /* send to \g file, which we may have opened already */
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- copy_stream = gfile_fout;
- }
- else
- success = false;
- }
- else
+ /* COPY followed by \g filename or \g |program */
+ success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
copy_stream = gfile_fout;
}
else
success &= HandleCopyResult(&result, copy_stream);
}
+ /* If we have a chunked result, collect and print all chunks */
+ if (result_status == PGRES_TUPLES_CHUNK)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+ int64 total_tuples = 0;
+ bool is_pager = false;
+ int flush_error = 0;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ /* open \g file if needed */
+ success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* force use of pager for any chunked resultset going to stdout */
+ if (success && tuples_fout == stdout)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+
+ do
+ {
+ /*
+ * display the current chunk of results, unless the output
+ * stream stopped working or we got cancelled
+ */
+ if (success && !flush_error && !cancel_pressed)
+ {
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+
+ /* count tuples before dropping the result */
+ my_popt.topt.prior_records += PQntuples(result);
+ total_tuples += PQntuples(result);
+
+ ClearOrSaveResult(result);
+
+ /* get the next result, loop if it's PGRES_TUPLES_CHUNK */
+ result = PQgetResult(pset.db);
+ } while (PQresultStatus(result) == PGRES_TUPLES_CHUNK);
+
+ /* We expect an empty PGRES_TUPLES_OK, else there's a problem */
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ char buf[32];
+
+ Assert(PQntuples(result) == 0);
+
+ /* Display the footer using the empty result */
+ if (success && !flush_error && !cancel_pressed)
+ {
+ my_popt.topt.stop_table = true;
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ fflush(tuples_fout);
+ }
+
+ if (is_pager)
+ ClosePager(tuples_fout);
+
+ /*
+ * We must do a fake SetResultVariables(), since we don't have
+ * a PGresult corresponding to the whole query.
+ */
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ /* Prevent SetResultVariables call below */
+ is_chunked_result = true;
+
+ /* Clear the empty result so it isn't printed below */
+ ClearOrSaveResult(result);
+ result = NULL;
+ }
+ else
+ {
+ /* Probably an error report, so close the pager and print it */
+ if (is_pager)
+ ClosePager(tuples_fout);
+
+ success &= AcceptResult(result, true);
+ /* SetResultVariables and ClearOrSaveResult happen below */
+ }
+ }
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
- /* set variables from last result */
- if (!is_watch && last)
+ /* set variables from last result, unless dealt with elsewhere */
+ if (last && !is_watch && !is_chunked_result)
SetResultVariables(result, success);
ClearOrSaveResult(result);
}
/* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
}
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
- bool OK = true;
- PGresult *result;
- PQExpBufferData buf;
- printQueryOpt my_popt = pset.popt;
- bool timing = pset.timing;
- FILE *fout;
- bool is_pipe;
- bool is_pager = false;
- bool started_txn = false;
- int64 total_tuples = 0;
- int ntuples;
- int fetch_count;
- char fetch_cmd[64];
- instr_time before,
- after;
- int flush_error;
-
- *elapsed_msec = 0;
-
- /* initialize print options for partial table output */
- my_popt.topt.start_table = true;
- my_popt.topt.stop_table = false;
- my_popt.topt.prior_records = 0;
-
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
- else
- INSTR_TIME_SET_ZERO(before);
-
- /* if we're not in a transaction, start one */
- if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
- {
- result = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- if (!OK)
- return false;
- started_txn = true;
- }
-
- /* Send DECLARE CURSOR */
- initPQExpBuffer(&buf);
- appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
- query);
-
- result = PQexec(pset.db, buf.data);
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- if (!OK)
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- termPQExpBuffer(&buf);
- if (!OK)
- goto cleanup;
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- /*
- * In \gset mode, we force the fetch count to be 2, so that we will throw
- * the appropriate error if the query returns more than one row.
- */
- if (pset.gset_prefix)
- fetch_count = 2;
- else
- fetch_count = pset.fetch_count;
-
- snprintf(fetch_cmd, sizeof(fetch_cmd),
- "FETCH FORWARD %d FROM _psql_cursor",
- fetch_count);
-
- /* prepare to write output to \g argument, if any */
- if (pset.gfname)
- {
- if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
- {
- OK = false;
- goto cleanup;
- }
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- {
- fout = pset.queryFout;
- is_pipe = false; /* doesn't matter */
- }
-
- /* clear any pre-existing error indication on the output stream */
- clearerr(fout);
-
- for (;;)
- {
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /* get fetch_count tuples at a time */
- result = PQexec(pset.db, fetch_cmd);
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- if (PQresultStatus(result) != PGRES_TUPLES_OK)
- {
- /* shut down pager before printing error message */
- if (is_pager)
- {
- ClosePager(fout);
- is_pager = false;
- }
-
- OK = AcceptResult(result, true);
- Assert(!OK);
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- break;
- }
-
- if (pset.gset_prefix)
- {
- /* StoreQueryTuple will complain if not exactly one row */
- OK = StoreQueryTuple(result);
- ClearOrSaveResult(result);
- break;
- }
-
- /*
- * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
- */
-
- ntuples = PQntuples(result);
- total_tuples += ntuples;
-
- if (ntuples < fetch_count)
- {
- /* this is the last result set, so allow footer decoration */
- my_popt.topt.stop_table = true;
- }
- else if (fout == stdout && !is_pager)
- {
- /*
- * If query requires multiple result sets, hack to ensure that
- * only one pager instance is used for the whole mess
- */
- fout = PageOutput(INT_MAX, &(my_popt.topt));
- is_pager = true;
- }
-
- printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
- ClearOrSaveResult(result);
-
- /* after the first result set, disallow header decoration */
- my_popt.topt.start_table = false;
- my_popt.topt.prior_records += ntuples;
-
- /*
- * Make sure to flush the output stream, so intermediate results are
- * visible to the client immediately. We check the results because if
- * the pager dies/exits/etc, there's no sense throwing more data at
- * it.
- */
- flush_error = fflush(fout);
-
- /*
- * Check if we are at the end, if a cancel was pressed, or if there
- * were any errors either trying to flush out the results, or more
- * generally on the output stream at all. If we hit any errors
- * writing things to the stream, we presume $PAGER has disappeared and
- * stop bothering to pull down more data.
- */
- if (ntuples < fetch_count || cancel_pressed || flush_error ||
- ferror(fout))
- break;
- }
-
- if (pset.gfname)
- {
- /* close \g argument file/pipe */
- if (is_pipe)
- {
- SetShellResultVariables(pclose(fout));
- restore_sigpipe_trap();
- }
- else
- fclose(fout);
- }
- else if (is_pager)
- {
- /* close transient pager */
- ClosePager(fout);
- }
-
- if (OK)
- {
- /*
- * We don't have a PGresult here, and even if we did it wouldn't have
- * the right row count, so fake SetResultVariables(). In error cases,
- * we already set the result variables above.
- */
- char buf[32];
-
- SetVariable(pset.vars, "ERROR", "false");
- SetVariable(pset.vars, "SQLSTATE", "00000");
- snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
- SetVariable(pset.vars, "ROW_COUNT", buf);
- }
-
-cleanup:
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /*
- * We try to close the cursor on either success or failure, but on failure
- * ignore the result (it's probably just a bleat about being in an aborted
- * transaction)
- */
- result = PQexec(pset.db, "CLOSE _psql_cursor");
- if (OK)
- {
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
- else
- PQclear(result);
-
- if (started_txn)
- {
- result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- return OK;
-}
-
-
/*
* Advance the given char pointer over white space and SQL comments.
*/
}
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
- int wordlen;
-
- /*
- * First advance over any whitespace, comments and left parentheses.
- */
- for (;;)
- {
- query = skip_white_space(query);
- if (query[0] == '(')
- query++;
- else
- break;
- }
-
- /*
- * Check word length (since "selectx" is not "select").
- */
- wordlen = 0;
- while (isalpha((unsigned char) query[wordlen]))
- wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
- if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
- return true;
-
- if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
- return true;
-
- return false;
-}
-
-
/*
* Test if the current user is a database superuser.
*/