static TupleTableSlot *ExecGather(PlanState *pstate);
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
-static HeapTuple gather_readnext(GatherState *gatherstate);
+static MinimalTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
* Initialize funnel slot to same tuple descriptor as outer plan.
*/
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
/*
* Gather doesn't support checking a qual (it's always more efficient to
PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot;
- HeapTuple tup;
+ MinimalTuple tup;
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{
if (HeapTupleIsValid(tup))
{
- ExecStoreHeapTuple(tup, /* tuple to store */
- fslot, /* slot to store the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ fslot, /* slot to store the tuple */
+ false); /* don't pfree tuple */
return fslot;
}
}
/*
* Attempt to read a tuple from one of our parallel workers.
*/
-static HeapTuple
+static MinimalTuple
gather_readnext(GatherState *gatherstate)
{
int nvisited = 0;
for (;;)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
bool readerdone;
/* Check for async events, particularly messages from workers. */
*/
typedef struct GMReaderTupleBuffer
{
- HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
+ MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
int nTuples; /* number of tuples currently stored */
int readCounter; /* index of next tuple to extract */
bool done; /* true if reader is known exhausted */
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
-static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
- bool nowait, bool *done);
+static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+ bool nowait, bool *done);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static void gather_merge_setup(GatherMergeState *gm_state);
static void gather_merge_init(GatherMergeState *gm_state);
{
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
- (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+ (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
/* Initialize tuple slot for worker */
gm_state->gm_slots[i + 1] =
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
}
/* Allocate the resources for the merge */
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
while (tuple_buffer->readCounter < tuple_buffer->nTuples)
- heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+ pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
ExecClearTuple(gm_state->gm_slots[i + 1]);
}
/* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{
- HeapTuple tuple;
+ MinimalTuple tuple;
tuple = gm_readnext_tuple(gm_state,
reader,
true,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tuple))
+ if (!tuple)
break;
tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++;
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{
GMReaderTupleBuffer *tuple_buffer;
- HeapTuple tup;
+ MinimalTuple tup;
/*
* If we're being asked to generate a tuple from the leader, then we just
reader,
nowait,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tup))
+ if (!tup)
return false;
/*
load_tuple_array(gm_state, reader);
}
- Assert(HeapTupleIsValid(tup));
+ Assert(tup);
/* Build the TupleTableSlot for the given tuple */
- ExecStoreHeapTuple(tup, /* tuple to store */
- gm_state->gm_slots[reader], /* slot in which to store
- * the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ gm_state->gm_slots[reader], /* slot in which to store
+ * the tuple */
+ true); /* pfree tuple when done with it */
return true;
}
/*
* Attempt to read a tuple from given worker.
*/
-static HeapTuple
+static MinimalTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool *done)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
- return tup;
+ /*
+ * Since we'll be buffering these across multiple calls, we need to make a
+ * copy.
+ */
+ return tup ? heap_copy_minimal_tuple(tup) : NULL;
}
/*
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- HeapTuple tuple;
+ MinimalTuple tuple;
shm_mq_result result;
bool should_free;
/* Send the tuple itself. */
- tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
- result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
+ tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
+ result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
if (should_free)
- heap_freetuple(tuple);
+ pfree(tuple);
/* Check for failure. */
if (result == SHM_MQ_DETACHED)
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
- * The returned tuple, if any, is allocated in CurrentMemoryContext.
- * Note that this routine must not leak memory! (We used to allow that,
- * but not any more.)
+ * The returned tuple, if any, is either in shared memory or a private buffer
+ * and should not be freed. The pointer is invalid after the next call to
+ * TupleQueueReaderNext().
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned.
*/
-HeapTuple
+MinimalTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
- HeapTupleData htup;
+ MinimalTuple tuple;
shm_mq_result result;
Size nbytes;
void *data;
Assert(result == SHM_MQ_SUCCESS);
/*
- * Set up a dummy HeapTupleData pointing to the data from the shm_mq
- * (which had better be sufficiently aligned).
+ * Return a pointer to the queue memory directly (which had better be
+ * sufficiently aligned).
*/
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = nbytes;
- htup.t_data = data;
+ tuple = (MinimalTuple) data;
+ Assert(tuple->t_len == nbytes);
- return heap_copytuple(&htup);
+ return tuple;
}
List *tlist;
/*
- * Although the Gather node can project, we prefer to push down such work
- * to its child node, so demand an exact tlist from the child.
+ * Push projection down to the child node. That way, the projection work
+ * is parallelized, and there can be no system columns in the result (they
+ * can't travel through a tuple queue because it uses MinimalTuple
+ * representation).
*/
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
List *pathkeys = best_path->path.pathkeys;
List *tlist = build_path_tlist(root, &best_path->path);
- /* As with Gather, it's best to project away columns in the workers. */
+ /* As with Gather, project away columns in the workers. */
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
/* Create a shell for a GatherMerge plan. */
/* Use these to receive tuples from a shm_mq. */
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
extern void DestroyTupleQueueReader(TupleQueueReader *reader);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
- bool nowait, bool *done);
+extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
+ bool nowait, bool *done);
#endif /* TQUEUE_H */