libpq: Fix sending queries in pipeline aborted state
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 9 Jul 2021 19:57:59 +0000 (15:57 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 9 Jul 2021 19:57:59 +0000 (15:57 -0400)
When sending queries in pipeline mode, we were careless about leaving
the connection in the right state so that PQgetResult would behave
correctly; trying to read further results after sending a query after
having read a result with an error would sometimes hang.  Fix by
ensuring internal libpq state is changed properly.  All the state
changes were being done by the callers of pqAppendCmdQueueEntry(); it
would have become too repetitious to have this logic in each of them, so
instead put it all in that function and relieve callers of the
responsibility.

Add a test to verify this case.  Without the code fix, this new test
hangs sometimes.

Also, document that PQisBusy() would return false when no queries are
pending result.  This is not intuitively obvious, and NULL would be
obtained by calling PQgetResult() at that point, which is confusing.
Wording by Boris Kolpackov.

In passing, fix bogus use of "false" to mean "0", per Ranier Vilela.

Backpatch to 14.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: Boris Kolpackov <boris@codesynthesis.com>
Discussion: https://postgr.es/m/boris.20210624103805@codesynthesis.com

doc/src/sgml/libpq.sgml
src/interfaces/libpq/fe-exec.c
src/test/modules/libpq_pipeline/libpq_pipeline.c

index 59e3e678f9e7620a72ca5f74f76fec0beb2bd541..2e4f615a6591b80d382771b7fddfb71d26cae83e 100644 (file)
@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn);
 
     <para>
      <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
-     operate as normal when processing pipeline results.
+     operate as normal when processing pipeline results.  In particular,
+     a call to <function>PQisBusy</function> in the middle of a pipeline
+     returns 0 if the results for all the queries issued so far have been
+     consumed.
     </para>
 
     <para>
index b13ddab393be59b4258baa07c91dab77b53d12e0..aca81890bb1661d8e338c99230cb9e5b96220788 100644 (file)
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
 
 /*
  * pqAppendCmdQueueEntry
- *     Append a caller-allocated command queue entry to the queue.
+ *     Append a caller-allocated entry to the command queue, and update
+ *     conn->asyncStatus to account for it.
  *
  * The query itself must already have been put in the output buffer by the
  * caller.
@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
        conn->cmd_queue_tail->next = entry;
 
    conn->cmd_queue_tail = entry;
+
+   switch (conn->pipelineStatus)
+   {
+       case PQ_PIPELINE_OFF:
+       case PQ_PIPELINE_ON:
+
+           /*
+            * When not in pipeline aborted state, if there's a result ready
+            * to be consumed, let it be so (that is, don't change away from
+            * READY or READY_MORE); otherwise set us busy to wait for
+            * something to arrive from the server.
+            */
+           if (conn->asyncStatus == PGASYNC_IDLE)
+               conn->asyncStatus = PGASYNC_BUSY;
+           break;
+
+       case PQ_PIPELINE_ABORTED:
+
+           /*
+            * In aborted pipeline state, we don't expect anything from the
+            * server (since we don't send any queries that are queued).
+            * Therefore, if IDLE then do what PQgetResult would do to let
+            * itself consume commands from the queue; if we're in any other
+            * state, we don't have to do anything.
+            */
+           if (conn->asyncStatus == PGASYNC_IDLE)
+           {
+               resetPQExpBuffer(&conn->errorMessage);
+               pqPipelineProcessQueue(conn);
+           }
+           break;
+   }
 }
 
 /*
@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   conn->asyncStatus = PGASYNC_BUSY;
    return 1;
 
 sendFailed:
@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
    /* if insufficient memory, query just winds up NULL */
    entry->query = strdup(query);
 
-   pqAppendCmdQueueEntry(conn, entry);
-
-   conn->asyncStatus = PGASYNC_BUSY;
-
    /*
     * Give the data a push (in pipeline mode, only if we're past the size
     * threshold).  In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
    if (pqPipelineFlush(conn) < 0)
        goto sendFailed;
 
+   /* OK, it's launched! */
+   pqAppendCmdQueueEntry(conn, entry);
+
    return 1;
 
 sendFailed:
@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   conn->asyncStatus = PGASYNC_BUSY;
+
    return 1;
 
 sendFailed:
@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   conn->asyncStatus = PGASYNC_BUSY;
+
    return 1;
 
 sendFailed:
@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
  * pqPipelineProcessQueue: subroutine for PQgetResult
  *     In pipeline mode, start processing the results of the next query in the queue.
  */
-void
+static void
 pqPipelineProcessQueue(PGconn *conn)
 {
    switch (conn->asyncStatus)
@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
        pqPutMsgEnd(conn) < 0)
        goto sendFailed;
 
-   pqAppendCmdQueueEntry(conn, entry);
-
    /*
     * Give the data a push.  In nonblock mode, don't complain if we're unable
     * to send it all; PQgetResult() will do any additional flushing needed.
     */
    if (PQflush(conn) < 0)
        goto sendFailed;
-   conn->asyncStatus = PGASYNC_BUSY;
+
+   /* OK, it's launched! */
+   pqAppendCmdQueueEntry(conn, entry);
 
    return 1;
 
@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
    {
        appendPQExpBufferStr(&conn->errorMessage,
                             libpq_gettext("another command is already in progress\n"));
-       return false;
+       return 0;
    }
 
    if (pqPutMsgStart('H', conn) < 0 ||
index 249ee22105c18f82e840a1fbc96675be07f46561..c27c4e0adaf052749574681dc4d2c50ccab9b08b 100644 (file)
@@ -28,6 +28,8 @@
 
 
 static void exit_nicely(PGconn *conn);
+static bool process_result(PGconn *conn, PGresult *res, int results,
+                          int numsent);
 
 const char *const progname = "libpq_pipeline";
 
@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
    fprintf(stderr, "ok\n");
 }
 
+/*
+ * In this test mode we send a stream of queries, with one in the middle
+ * causing an error.  Verify that we can still send some more after the
+ * error and have libpq work properly.
+ */
+static void
+test_uniqviol(PGconn *conn)
+{
+   int         sock = PQsocket(conn);
+   PGresult   *res;
+   Oid         paramTypes[2] = {INT8OID, INT8OID};
+   const char *paramValues[2];
+   char        paramValue0[MAXINT8LEN];
+   char        paramValue1[MAXINT8LEN];
+   int         ctr = 0;
+   int         numsent = 0;
+   int         results = 0;
+   bool        read_done = false;
+   bool        write_done = false;
+   bool        error_sent = false;
+   bool        got_error = false;
+   int         switched = 0;
+   int         socketful = 0;
+   fd_set      in_fds;
+   fd_set      out_fds;
+
+   fprintf(stderr, "uniqviol ...");
+
+   PQsetnonblocking(conn, 1);
+
+   paramValues[0] = paramValue0;
+   paramValues[1] = paramValue1;
+   sprintf(paramValue1, "42");
+
+   res = PQexec(conn, "drop table if exists ppln_uniqviol;"
+                "create table ppln_uniqviol(id bigint primary key, idata bigint)");
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       pg_fatal("failed to create table: %s", PQerrorMessage(conn));
+
+   res = PQexec(conn, "begin");
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
+
+   res = PQprepare(conn, "insertion",
+                   "insert into ppln_uniqviol values ($1, $2) returning id",
+                   2, paramTypes);
+   if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+       pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
+
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("failed to enter pipeline mode");
+
+   while (!read_done)
+   {
+       /*
+        * Avoid deadlocks by reading everything the server has sent before
+        * sending anything.  (Special precaution is needed here to process
+        * PQisBusy before testing the socket for read-readiness, because the
+        * socket does not turn read-ready after "sending" queries in aborted
+        * pipeline mode.)
+        */
+       while (PQisBusy(conn) == 0)
+       {
+           bool        new_error;
+
+           if (results >= numsent)
+           {
+               if (write_done)
+                   read_done = true;
+               break;
+           }
+
+           res = PQgetResult(conn);
+           new_error = process_result(conn, res, results, numsent);
+           if (new_error && got_error)
+               pg_fatal("got two errors");
+           got_error |= new_error;
+           if (results++ >= numsent - 1)
+           {
+               if (write_done)
+                   read_done = true;
+               break;
+           }
+       }
+
+       if (read_done)
+           break;
+
+       FD_ZERO(&out_fds);
+       FD_SET(sock, &out_fds);
+
+       FD_ZERO(&in_fds);
+       FD_SET(sock, &in_fds);
+
+       if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
+       {
+           if (errno == EINTR)
+               continue;
+           pg_fatal("select() failed: %m");
+       }
+
+       if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
+           pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
+
+       /*
+        * If the socket is writable and we haven't finished sending queries,
+        * send some.
+        */
+       if (!write_done && FD_ISSET(sock, &out_fds))
+       {
+           for (;;)
+           {
+               int         flush;
+
+               /*
+                * provoke uniqueness violation exactly once after having
+                * switched to read mode.
+                */
+               if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
+               {
+                   sprintf(paramValue0, "%d", numsent / 2);
+                   fprintf(stderr, "E");
+                   error_sent = true;
+               }
+               else
+               {
+                   fprintf(stderr, ".");
+                   sprintf(paramValue0, "%d", ctr++);
+               }
+
+               if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
+                   pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
+               numsent++;
+
+               /* Are we done writing? */
+               if (socketful != 0 && numsent % socketful == 42 && error_sent)
+               {
+                   if (PQsendFlushRequest(conn) != 1)
+                       pg_fatal("failed to send flush request");
+                   write_done = true;
+                   fprintf(stderr, "\ndone writing\n");
+                   PQflush(conn);
+                   break;
+               }
+
+               /* is the outgoing socket full? */
+               flush = PQflush(conn);
+               if (flush == -1)
+                   pg_fatal("failed to flush: %s", PQerrorMessage(conn));
+               if (flush == 1)
+               {
+                   if (socketful == 0)
+                       socketful = numsent;
+                   fprintf(stderr, "\nswitch to reading\n");
+                   switched++;
+                   break;
+               }
+           }
+       }
+   }
+
+   if (!got_error)
+       pg_fatal("did not get expected error");
+
+   fprintf(stderr, "ok\n");
+}
+
+/*
+ * Subroutine for test_uniqviol; given a PGresult, print it out and consume
+ * the expected NULL that should follow it.
+ *
+ * Returns true if we read a fatal error message, otherwise false.
+ */
+static bool
+process_result(PGconn *conn, PGresult *res, int results, int numsent)
+{
+   PGresult   *res2;
+   bool        got_error = false;
+
+   if (res == NULL)
+       pg_fatal("got unexpected NULL");
+
+   switch (PQresultStatus(res))
+   {
+       case PGRES_FATAL_ERROR:
+           got_error = true;
+           fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
+           PQclear(res);
+
+           res2 = PQgetResult(conn);
+           if (res2 != NULL)
+               pg_fatal("expected NULL, got %s",
+                        PQresStatus(PQresultStatus(res2)));
+           break;
+
+       case PGRES_TUPLES_OK:
+           fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
+           PQclear(res);
+
+           res2 = PQgetResult(conn);
+           if (res2 != NULL)
+               pg_fatal("expected NULL, got %s",
+                        PQresStatus(PQresultStatus(res2)));
+           break;
+
+       case PGRES_PIPELINE_ABORTED:
+           fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
+           res2 = PQgetResult(conn);
+           if (res2 != NULL)
+               pg_fatal("expected NULL, got %s",
+                        PQresStatus(PQresultStatus(res2)));
+           break;
+
+       default:
+           pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
+   }
+
+   return got_error;
+}
+
+
 static void
 usage(const char *progname)
 {
@@ -1331,6 +1554,7 @@ print_test_list(void)
    printf("simple_pipeline\n");
    printf("singlerow\n");
    printf("transaction\n");
+   printf("uniqviol\n");
 }
 
 int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
        test_singlerowmode(conn);
    else if (strcmp(testname, "transaction") == 0)
        test_transaction(conn);
+   else if (strcmp(testname, "uniqviol") == 0)
+       test_uniqviol(conn);
    else
    {
        fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);