Make struct ParallelSlot private within pg_dump/parallel.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 27 Sep 2016 18:29:12 +0000 (14:29 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 27 Sep 2016 18:29:12 +0000 (14:29 -0400)
The only field of this struct that other files have any need to touch
is the pointer to the TocEntry a worker is working on.  (Well,
pg_backup_archiver.c is actually looking at workerStatus too, but that
can be finessed by specifying that the TocEntry pointer is NULL for a
non-busy worker.)

Hence, move out the TocEntry pointers to a separate array within
struct ParallelState, and then we can make struct ParallelSlot private.

I noted the possibility of this previously, but hadn't got round to
actually doing it.

Discussion: <1188.1464544443@sss.pgh.pa.us>

src/bin/pg_dump/parallel.c
src/bin/pg_dump/parallel.h
src/bin/pg_dump/pg_backup_archiver.c

index 0e2bfa106a741d99ff83d84459152c62d0c3ec87..5630dc626d75123465596e9cc96b7c054c0cb081 100644 (file)
@@ -45,6 +45,8 @@
  *     WRKR_IDLE: it's waiting for a command
  *     WRKR_WORKING: it's working on a command
  *     WRKR_TERMINATED: process ended
+ * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
+ * state, and must be NULL in other states.
  */
 
 #include "postgres_fe.h"
 
 #define NO_SLOT (-1)           /* Failure result for GetIdleWorker() */
 
+/* Worker process statuses */
+typedef enum
+{
+   WRKR_IDLE,
+   WRKR_WORKING,
+   WRKR_TERMINATED
+} T_WorkerStatus;
+
+/*
+ * Private per-parallel-worker state (typedef for this is in parallel.h).
+ *
+ * Much of this is valid only in the master process (or, on Windows, should
+ * be touched only by the master thread).  But the AH field should be touched
+ * only by workers.  The pipe descriptors are valid everywhere.
+ */
+struct ParallelSlot
+{
+   T_WorkerStatus workerStatus;    /* see enum above */
+
+   /* These fields are valid if workerStatus == WRKR_WORKING: */
+   ParallelCompletionPtr callback;     /* function to call on completion */
+   void       *callback_data;  /* passthru data for it */
+
+   ArchiveHandle *AH;          /* Archive data worker is using */
+
+   int         pipeRead;       /* master's end of the pipes */
+   int         pipeWrite;
+   int         pipeRevRead;    /* child's end of the pipes */
+   int         pipeRevWrite;
+
+   /* Child process/thread identity info: */
+#ifdef WIN32
+   uintptr_t   hThread;
+   unsigned int threadId;
+#else
+   pid_t       pid;
+#endif
+};
+
 #ifdef WIN32
 
 /*
@@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate)
        }
 #endif   /* WIN32 */
 
-       /* On all platforms, update workerStatus as well */
+       /* On all platforms, update workerStatus and te[] as well */
        Assert(j < pstate->numWorkers);
        slot->workerStatus = WRKR_TERMINATED;
+       pstate->te[j] = NULL;
    }
 }
 
@@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH)
 {
    ParallelState *pstate;
    int         i;
-   const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
 
    Assert(AH->public.numWorkers > 0);
 
    pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
 
    pstate->numWorkers = AH->public.numWorkers;
+   pstate->te = NULL;
    pstate->parallelSlot = NULL;
 
    if (AH->public.numWorkers == 1)
        return pstate;
 
-   pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
-   memset((void *) pstate->parallelSlot, 0, slotSize);
+   pstate->te = (TocEntry **)
+       pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
+   pstate->parallelSlot = (ParallelSlot *)
+       pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
 
 #ifdef WIN32
    /* Make fmtId() and fmtQualifiedId() use thread-local storage */
@@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH)
                          "could not create communication channels: %s\n",
                          strerror(errno));
 
+       pstate->te[i] = NULL;   /* just for safety */
+
        slot->workerStatus = WRKR_IDLE;
        slot->AH = NULL;
-       slot->te = NULL;
        slot->callback = NULL;
        slot->callback_data = NULL;
 
@@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
    set_cancel_pstate(NULL);
 
    /* Release state (mere neatnik-ism, since we're about to terminate) */
+   free(pstate->te);
    free(pstate->parallelSlot);
    free(pstate);
 }
@@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH,
 
    /* Remember worker is busy, and which TocEntry it's working on */
    pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
-   pstate->parallelSlot[worker].te = te;
    pstate->parallelSlot[worker].callback = callback;
    pstate->parallelSlot[worker].callback_data = callback_data;
+   pstate->te[worker] = te;
 }
 
 /*
@@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
    if (messageStartsWith(msg, "OK "))
    {
        ParallelSlot *slot = &pstate->parallelSlot[worker];
-       TocEntry   *te = slot->te;
+       TocEntry   *te = pstate->te[worker];
        int         status;
 
        status = parseWorkerResponse(AH, te, msg);
        slot->callback(AH, te, status, slot->callback_data);
        slot->workerStatus = WRKR_IDLE;
-       slot->te = NULL;
+       pstate->te[worker] = NULL;
    }
    else
        exit_horribly(modulename,
index 8ee629b10689dfd7c0298960d98ef9c0cf3f051f..e0c442cf37794363e5699afba02b9486c1ee7d5c 100644 (file)
@@ -33,51 +33,16 @@ typedef enum
    WFW_ALL_IDLE
 } WFW_WaitOption;
 
-/* Worker process statuses */
-typedef enum
-{
-   WRKR_IDLE,
-   WRKR_WORKING,
-   WRKR_TERMINATED
-} T_WorkerStatus;
-
-/*
- * Per-parallel-worker state of parallel.c.
- *
- * Much of this is valid only in the master process (or, on Windows, should
- * be touched only by the master thread).  But the AH field should be touched
- * only by workers.  The pipe descriptors are valid everywhere.
- */
-typedef struct ParallelSlot
-{
-   T_WorkerStatus workerStatus;    /* see enum above */
-
-   /* These fields are valid if workerStatus == WRKR_WORKING: */
-   TocEntry   *te;             /* item being worked on */
-   ParallelCompletionPtr callback;     /* function to call on completion */
-   void       *callback_data;  /* passthru data for it */
-
-   ArchiveHandle *AH;          /* Archive data worker is using */
-
-   int         pipeRead;       /* master's end of the pipes */
-   int         pipeWrite;
-   int         pipeRevRead;    /* child's end of the pipes */
-   int         pipeRevWrite;
-
-   /* Child process/thread identity info: */
-#ifdef WIN32
-   uintptr_t   hThread;
-   unsigned int threadId;
-#else
-   pid_t       pid;
-#endif
-} ParallelSlot;
+/* ParallelSlot is an opaque struct known only within parallel.c */
+typedef struct ParallelSlot ParallelSlot;
 
 /* Overall state for parallel.c */
 typedef struct ParallelState
 {
    int         numWorkers;     /* allowed number of workers */
-   ParallelSlot *parallelSlot; /* array of numWorkers slots */
+   /* these arrays have numWorkers entries, one per worker: */
+   TocEntry  **te;             /* item being worked on, or NULL */
+   ParallelSlot *parallelSlot; /* private info about each worker */
 } ParallelState;
 
 #ifdef WIN32
index e19c24aec94f61def094bb73f8ff4521c11ca227..bba8b6ca9f90813e5193b8e9ba785eef37679eae 100644 (file)
@@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
 
        for (k = 0; k < pstate->numWorkers; k++)
        {
-           if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
-               pstate->parallelSlot[k].te->section == SECTION_DATA)
+           TocEntry   *running_te = pstate->te[k];
+
+           if (running_te != NULL &&
+               running_te->section == SECTION_DATA)
                count++;
        }
        if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
@@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
         */
        for (i = 0; i < pstate->numWorkers; i++)
        {
-           TocEntry   *running_te;
+           TocEntry   *running_te = pstate->te[i];
 
-           if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
+           if (running_te == NULL)
                continue;
-           running_te = pstate->parallelSlot[i].te;
-
            if (has_lock_conflicts(te, running_te) ||
                has_lock_conflicts(running_te, te))
            {