From c4649cce39a41b27db874e75ddd47adaec1b0ea4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 18 Oct 2021 14:30:00 +0300 Subject: [PATCH] Refactor LogicalTapeSet/LogicalTape interface. All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now take a LogicalTape as argument, instead of LogicalTapeSet+tape number. You can create any number of LogicalTapes in a single LogicalTapeSet, and you don't need to decide the number upfront, when you create the tape set. This makes the tape management in hash agg spilling in nodeAgg.c simpler. Discussion: https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi Reviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor --- src/backend/executor/nodeAgg.c | 187 ++++-------- src/backend/utils/sort/logtape.c | 456 ++++++++++++----------------- src/backend/utils/sort/tuplesort.c | 229 +++++++-------- src/include/nodes/execnodes.h | 3 +- src/include/utils/logtape.h | 37 ++- 5 files changed, 359 insertions(+), 553 deletions(-) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 39bea204d1..c99a0de4dd 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -208,7 +208,16 @@ * * Spilled data is written to logical tapes. These provide better control * over memory usage, disk space, and the number of files than if we were - * to use a BufFile for each spill. + * to use a BufFile for each spill. We don't know the number of tapes needed + * at the start of the algorithm (because it can recurse), so a tape set is + * allocated at the beginning, and individual tapes are created as needed. + * As a particular tape is read, logtape.c recycles its disk space. When a + * tape is read to completion, it is destroyed entirely. + * + * Tapes' buffers can take up substantial memory when many tapes are open at + * once. We only need one tape open at a time in read mode (using a buffer + * that's a multiple of BLCKSZ); but we need one tape open in write mode (each + * requiring a buffer of size BLCKSZ) for each partition. * * Note that it's possible for transition states to start small but then * grow very large; for instance in the case of ARRAY_AGG. In such cases, @@ -311,27 +320,6 @@ */ #define CHUNKHDRSZ 16 -/* - * Track all tapes needed for a HashAgg that spills. We don't know the maximum - * number of tapes needed at the start of the algorithm (because it can - * recurse), so one tape set is allocated and extended as needed for new - * tapes. When a particular tape is already read, rewind it for write mode and - * put it in the free list. - * - * Tapes' buffers can take up substantial memory when many tapes are open at - * once. We only need one tape open at a time in read mode (using a buffer - * that's a multiple of BLCKSZ); but we need one tape open in write mode (each - * requiring a buffer of size BLCKSZ) for each partition. - */ -typedef struct HashTapeInfo -{ - LogicalTapeSet *tapeset; - int ntapes; - int *freetapes; - int nfreetapes; - int freetapes_alloc; -} HashTapeInfo; - /* * Represents partitioned spill data for a single hashtable. Contains the * necessary information to route tuples to the correct partition, and to @@ -343,9 +331,8 @@ typedef struct HashTapeInfo */ typedef struct HashAggSpill { - LogicalTapeSet *tapeset; /* borrowed reference to tape set */ int npartitions; /* number of partitions */ - int *partitions; /* spill partition tape numbers */ + LogicalTape **partitions; /* spill partition tapes */ int64 *ntuples; /* number of tuples in each partition */ uint32 mask; /* mask to find partition from hash value */ int shift; /* after masking, shift by this amount */ @@ -365,8 +352,7 @@ typedef struct HashAggBatch { int setno; /* grouping set */ int used_bits; /* number of bits of hash already used */ - LogicalTapeSet *tapeset; /* borrowed reference to tape set */ - int input_tapenum; /* input partition tape */ + LogicalTape *input_tape; /* input partition tape */ int64 input_tuples; /* number of tuples in this batch */ double input_card; /* estimated group cardinality */ } HashAggBatch; @@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions); static void hashagg_finish_initial_spills(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate); -static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, - int input_tapenum, int setno, +static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); -static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, +static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts, int used_bits, double input_groups, double hashentrysize); static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash); static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno); -static void hashagg_tapeinfo_init(AggState *aggstate); -static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, - int ndest); -static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate) if (!aggstate->hash_ever_spilled) { - Assert(aggstate->hash_tapeinfo == NULL); + Assert(aggstate->hash_tapeset == NULL); Assert(aggstate->hash_spills == NULL); aggstate->hash_ever_spilled = true; - hashagg_tapeinfo_init(aggstate); + aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1); aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes); @@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate) AggStatePerHash perhash = &aggstate->perhash[setno]; HashAggSpill *spill = &aggstate->hash_spills[setno]; - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, aggstate->hashentrysize); } @@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) aggstate->hash_mem_peak = total_mem; /* update disk usage */ - if (aggstate->hash_tapeinfo != NULL) + if (aggstate->hash_tapeset != NULL) { - uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024); + uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024); if (aggstate->hash_disk_used < disk_used) aggstate->hash_disk_used = disk_used; @@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate) TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; if (spill->partitions == NULL) - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, aggstate->hashentrysize); @@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate) HashAggBatch *batch; AggStatePerHash perhash; HashAggSpill spill; - HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + LogicalTapeSet *tapeset = aggstate->hash_tapeset; bool spill_initialized = false; if (aggstate->hash_batches == NIL) @@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate) * that we don't assign tapes that will never be used. */ spill_initialized = true; - hashagg_spill_init(&spill, tapeinfo, batch->used_bits, + hashagg_spill_init(&spill, tapeset, batch->used_bits, batch->input_card, aggstate->hashentrysize); } /* no memory for a new group, spill */ @@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } - hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); + LogicalTapeClose(batch->input_tape); /* change back to phase 0 */ aggstate->current_phase = 0; @@ -2884,67 +2865,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) return NULL; } -/* - * Initialize HashTapeInfo - */ -static void -hashagg_tapeinfo_init(AggState *aggstate) -{ - HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo)); - int init_tapes = 16; /* expanded dynamically */ - - tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1); - tapeinfo->ntapes = init_tapes; - tapeinfo->nfreetapes = init_tapes; - tapeinfo->freetapes_alloc = init_tapes; - tapeinfo->freetapes = palloc(init_tapes * sizeof(int)); - for (int i = 0; i < init_tapes; i++) - tapeinfo->freetapes[i] = i; - - aggstate->hash_tapeinfo = tapeinfo; -} - -/* - * Assign unused tapes to spill partitions, extending the tape set if - * necessary. - */ -static void -hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions, - int npartitions) -{ - int partidx = 0; - - /* use free tapes if available */ - while (partidx < npartitions && tapeinfo->nfreetapes > 0) - partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes]; - - if (partidx < npartitions) - { - LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx); - - while (partidx < npartitions) - partitions[partidx++] = tapeinfo->ntapes++; - } -} - -/* - * After a tape has already been written to and then read, this function - * rewinds it for writing and adds it to the free list. - */ -static void -hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) -{ - /* rewinding frees the buffer while not in use */ - LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum); - if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes) - { - tapeinfo->freetapes_alloc <<= 1; - tapeinfo->freetapes = repalloc(tapeinfo->freetapes, - tapeinfo->freetapes_alloc * sizeof(int)); - } - tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum; -} - /* * hashagg_spill_init * @@ -2952,7 +2872,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) * of partitions to create, and initializes them. */ static void -hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, +hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, double input_groups, double hashentrysize) { int npartitions; @@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, npartitions = hash_choose_num_partitions(input_groups, hashentrysize, used_bits, &partition_bits); - spill->partitions = palloc0(sizeof(int) * npartitions); + spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions); spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); - hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); + for (int i = 0; i < npartitions; i++) + spill->partitions[i] = LogicalTapeCreate(tapeset); - spill->tapeset = tapeinfo->tapeset; spill->shift = 32 - used_bits - partition_bits; spill->mask = (npartitions - 1) << spill->shift; spill->npartitions = npartitions; @@ -2986,11 +2906,10 @@ static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *inputslot, uint32 hash) { - LogicalTapeSet *tapeset = spill->tapeset; TupleTableSlot *spillslot; int partition; MinimalTuple tuple; - int tapenum; + LogicalTape *tape; int total_written = 0; bool shouldFree; @@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, */ addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash)); - tapenum = spill->partitions[partition]; + tape = spill->partitions[partition]; - LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); + LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32)); total_written += sizeof(uint32); - LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len); + LogicalTapeWrite(tape, (void *) tuple, tuple->t_len); total_written += tuple->t_len; if (shouldFree) @@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, * be done. */ static HashAggBatch * -hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, +hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); batch->setno = setno; batch->used_bits = used_bits; - batch->tapeset = tapeset; - batch->input_tapenum = tapenum; + batch->input_tape = input_tape; batch->input_tuples = input_tuples; batch->input_card = input_card; @@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) { - LogicalTapeSet *tapeset = batch->tapeset; - int tapenum = batch->input_tapenum; + LogicalTape *tape = batch->input_tape; MinimalTuple tuple; uint32 t_len; size_t nread; uint32 hash; - nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32)); + nread = LogicalTapeRead(tape, &hash, sizeof(uint32)); if (nread == 0) return NULL; if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, sizeof(uint32), nread))); if (hashp != NULL) *hashp = hash; - nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len)); + nread = LogicalTapeRead(tape, &t_len, sizeof(t_len)); if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, sizeof(uint32), nread))); tuple = (MinimalTuple) palloc(t_len); tuple->t_len = t_len; - nread = LogicalTapeRead(tapeset, tapenum, + nread = LogicalTapeRead(tape, (void *) ((char *) tuple + sizeof(uint32)), t_len - sizeof(uint32)); if (nread != t_len - sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, t_len - sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, t_len - sizeof(uint32), nread))); return tuple; } @@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) for (i = 0; i < spill->npartitions; i++) { - LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset; - int tapenum = spill->partitions[i]; + LogicalTape *tape = spill->partitions[i]; HashAggBatch *new_batch; double cardinality; @@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) freeHyperLogLog(&spill->hll_card[i]); /* rewinding frees the buffer while not in use */ - LogicalTapeRewindForRead(tapeset, tapenum, - HASHAGG_READ_BUFFER_SIZE); + LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE); - new_batch = hashagg_batch_new(tapeset, tapenum, setno, + new_batch = hashagg_batch_new(tape, setno, spill->ntuples[i], cardinality, used_bits); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); @@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate) aggstate->hash_batches = NIL; /* close tape set */ - if (aggstate->hash_tapeinfo != NULL) + if (aggstate->hash_tapeset != NULL) { - HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; - - LogicalTapeSetClose(tapeinfo->tapeset); - pfree(tapeinfo->freetapes); - pfree(tapeinfo); - aggstate->hash_tapeinfo = NULL; + LogicalTapeSetClose(aggstate->hash_tapeset); + aggstate->hash_tapeset = NULL; } } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index debf12e1b0..6d7f862fb5 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -9,8 +9,7 @@ * there is an annoying problem: the peak space usage is at least twice * the volume of actual data to be sorted. (This must be so because each * datum will appear in both the input and output tapes of the final - * merge pass. For seven-tape polyphase merge, which is otherwise a - * pretty good algorithm, peak usage is more like 4x actual data volume.) + * merge pass.) * * We can work around this problem by recognizing that any one tape * dataset (with the possible exception of the final output) is written @@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer */ typedef struct LogicalTape { + LogicalTapeSet *tapeSet; /* tape set this tape is part of */ + bool writing; /* T while in write phase */ bool frozen; /* T if blocks should not be freed when read */ bool dirty; /* does buffer need to be written? */ @@ -180,11 +181,14 @@ typedef struct LogicalTape * This data structure represents a set of related "logical tapes" sharing * space in a single underlying file. (But that "file" may be multiple files * if needed to escape OS limits on file size; buffile.c handles that for us.) - * The number of tapes is fixed at creation. + * Tapes belonging to a tape set can be created and destroyed on-the-fly, on + * demand. */ struct LogicalTapeSet { BufFile *pfile; /* underlying file for whole tape set */ + SharedFileSet *fileset; + int worker; /* worker # if shared, -1 for leader/serial */ /* * File size tracking. nBlocksWritten is the size of the underlying file, @@ -213,22 +217,16 @@ struct LogicalTapeSet long nFreeBlocks; /* # of currently free blocks */ Size freeBlocksLen; /* current allocated length of freeBlocks[] */ bool enable_prealloc; /* preallocate write blocks? */ - - /* The array of logical tapes. */ - int nTapes; /* # of logical tapes in set */ - LogicalTape *tapes; /* has nTapes nentries */ }; +static LogicalTape *ltsCreateTape(LogicalTapeSet *lts); static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt); static long ltsGetFreeBlock(LogicalTapeSet *lts); static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt); static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); -static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, - SharedFileSet *fileset); -static void ltsInitTape(LogicalTape *lt); -static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt); +static void ltsInitReadBuffer(LogicalTape *lt); /* @@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) * Returns true if anything was read, 'false' on EOF. */ static bool -ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt) +ltsReadFillBuffer(LogicalTape *lt) { lt->pos = 0; lt->nbytes = 0; @@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt) datablocknum += lt->offsetBlockNumber; /* Read the block */ - ltsReadBlock(lts, datablocknum, (void *) thisbuf); + ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf); if (!lt->frozen) - ltsReleaseBlock(lts, datablocknum); + ltsReleaseBlock(lt->tapeSet, datablocknum); lt->curBlockNumber = lt->nextBlockNumber; lt->nbytes += TapeBlockGetNBytes(thisbuf); @@ -530,126 +528,13 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) } } -/* - * Claim ownership of a set of logical tapes from existing shared BufFiles. - * - * Caller should be leader process. Though tapes are marked as frozen in - * workers, they are not frozen when opened within leader, since unfrozen tapes - * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized - * for random access.) - */ -static void -ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, - SharedFileSet *fileset) -{ - LogicalTape *lt = NULL; - long tapeblocks = 0L; - long nphysicalblocks = 0L; - int i; - - /* Should have at least one worker tape, plus leader's tape */ - Assert(lts->nTapes >= 2); - - /* - * Build concatenated view of all BufFiles, remembering the block number - * where each source file begins. No changes are needed for leader/last - * tape. - */ - for (i = 0; i < lts->nTapes - 1; i++) - { - char filename[MAXPGPATH]; - BufFile *file; - int64 filesize; - - lt = <s->tapes[i]; - - pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); - filesize = BufFileSize(file); - - /* - * Stash first BufFile, and concatenate subsequent BufFiles to that. - * Store block offset into each tape as we go. - */ - lt->firstBlockNumber = shared[i].firstblocknumber; - if (i == 0) - { - lts->pfile = file; - lt->offsetBlockNumber = 0L; - } - else - { - lt->offsetBlockNumber = BufFileAppend(lts->pfile, file); - } - /* Don't allocate more for read buffer than could possibly help */ - lt->max_size = Min(MaxAllocSize, filesize); - tapeblocks = filesize / BLCKSZ; - nphysicalblocks += tapeblocks; - } - - /* - * Set # of allocated blocks, as well as # blocks written. Use extent of - * new BufFile space (from 0 to end of last worker's tape space) for this. - * Allocated/written blocks should include space used by holes left - * between concatenated BufFiles. - */ - lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks; - lts->nBlocksWritten = lts->nBlocksAllocated; - - /* - * Compute number of hole blocks so that we can later work backwards, and - * instrument number of physical blocks. We don't simply use physical - * blocks directly for instrumentation because this would break if we ever - * subsequently wrote to the leader tape. - * - * Working backwards like this keeps our options open. If shared BufFiles - * ever support being written to post-export, logtape.c can automatically - * take advantage of that. We'd then support writing to the leader tape - * while recycling space from worker tapes, because the leader tape has a - * zero offset (write routines won't need to have extra logic to apply an - * offset). - * - * The only thing that currently prevents writing to the leader tape from - * working is the fact that BufFiles opened using BufFileOpenFileSet() are - * read-only by definition, but that could be changed if it seemed - * worthwhile. For now, writing to the leader tape will raise a "Bad file - * descriptor" error, so tuplesort must avoid writing to the leader tape - * altogether. - */ - lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks; -} - -/* - * Initialize per-tape struct. Note we allocate the I/O buffer lazily. - */ -static void -ltsInitTape(LogicalTape *lt) -{ - lt->writing = true; - lt->frozen = false; - lt->dirty = false; - lt->firstBlockNumber = -1L; - lt->curBlockNumber = -1L; - lt->nextBlockNumber = -1L; - lt->offsetBlockNumber = 0L; - lt->buffer = NULL; - lt->buffer_size = 0; - /* palloc() larger than MaxAllocSize would fail */ - lt->max_size = MaxAllocSize; - lt->pos = 0; - lt->nbytes = 0; - lt->prealloc = NULL; - lt->nprealloc = 0; - lt->prealloc_size = 0; -} - /* * Lazily allocate and initialize the read buffer. This avoids waste when many * tapes are open at once, but not all are active between rewinding and * reading. */ static void -ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt) +ltsInitReadBuffer(LogicalTape *lt) { Assert(lt->buffer_size > 0); lt->buffer = palloc(lt->buffer_size); @@ -658,40 +543,32 @@ ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt) lt->nextBlockNumber = lt->firstBlockNumber; lt->pos = 0; lt->nbytes = 0; - ltsReadFillBuffer(lts, lt); + ltsReadFillBuffer(lt); } /* - * Create a set of logical tapes in a temporary underlying file. + * Create a tape set, backed by a temporary underlying file. * - * Each tape is initialized in write state. Serial callers pass ntapes, - * NULL argument for shared, and -1 for worker. Parallel worker callers - * pass ntapes, a shared file handle, NULL shared argument, and their own - * worker number. Leader callers, which claim shared worker tapes here, - * must supply non-sentinel values for all arguments except worker number, - * which should be -1. + * The tape set is initially empty. Use LogicalTapeCreate() to create + * tapes in it. * - * Leader caller is passing back an array of metadata each worker captured - * when LogicalTapeFreeze() was called for their final result tapes. Passed - * tapes array is actually sized ntapes - 1, because it includes only - * worker tapes, whereas leader requires its own leader tape. Note that we - * rely on the assumption that reclaimed worker tapes will only be read - * from once by leader, and never written to again (tapes are initialized - * for writing, but that's only to be consistent). Leader may not write to - * its own tape purely due to a restriction in the shared buffile - * infrastructure that may be lifted in the future. + * Serial callers pass NULL argument for shared, and -1 for worker. Parallel + * worker callers pass a shared file handle and their own worker number. + * + * Leader callers pass a shared file handle and -1 for worker. After creating + * the tape set, use LogicalTapeImport() to import the worker tapes into it. + * + * Currently, the leader will only import worker tapes into the set, it does + * not create tapes of its own, although in principle that should work. */ LogicalTapeSet * -LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, - SharedFileSet *fileset, int worker) +LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker) { LogicalTapeSet *lts; - int i; /* * Create top-level struct including per-tape LogicalTape structs. */ - Assert(ntapes > 0); lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet)); lts->nBlocksAllocated = 0L; lts->nBlocksWritten = 0L; @@ -701,22 +578,21 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); lts->nFreeBlocks = 0; lts->enable_prealloc = preallocate; - lts->nTapes = ntapes; - lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape)); - for (i = 0; i < ntapes; i++) - ltsInitTape(<s->tapes[i]); + lts->fileset = fileset; + lts->worker = worker; /* * Create temp BufFile storage as required. * - * Leader concatenates worker tapes, which requires special adjustment to - * final tapeset data. Things are simpler for the worker case and the + * In leader, we hijack the BufFile of the first tape that's imported, and + * concatenate the BufFiles of any subsequent tapes to that. Hence don't + * create a BufFile here. Things are simpler for the worker case and the * serial case, though. They are generally very similar -- workers use a * shared fileset, whereas serial sorts use a conventional serial BufFile. */ - if (shared) - ltsConcatWorkerTapes(lts, shared, fileset); + if (fileset && worker == -1) + lts->pfile = NULL; else if (fileset) { char filename[MAXPGPATH]; @@ -731,26 +607,145 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, } /* - * Close a logical tape set and release all resources. + * Claim ownership of a logical tape from an existing shared BufFile. + * + * Caller should be leader process. Though tapes are marked as frozen in + * workers, they are not frozen when opened within leader, since unfrozen tapes + * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized + * for random access.) */ -void -LogicalTapeSetClose(LogicalTapeSet *lts) +LogicalTape * +LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared) { LogicalTape *lt; - int i; + long tapeblocks; + char filename[MAXPGPATH]; + BufFile *file; + int64 filesize; - BufFileClose(lts->pfile); - for (i = 0; i < lts->nTapes; i++) + lt = ltsCreateTape(lts); + + /* + * build concatenated view of all buffiles, remembering the block number + * where each source file begins. + */ + pg_itoa(worker, filename); + file = BufFileOpenFileSet(<s->fileset->fs, filename, O_RDONLY, false); + filesize = BufFileSize(file); + + /* + * Stash first BufFile, and concatenate subsequent BufFiles to that. Store + * block offset into each tape as we go. + */ + lt->firstBlockNumber = shared->firstblocknumber; + if (lts->pfile == NULL) { - lt = <s->tapes[i]; - if (lt->buffer) - pfree(lt->buffer); + lts->pfile = file; + lt->offsetBlockNumber = 0L; } - pfree(lts->tapes); + else + { + lt->offsetBlockNumber = BufFileAppend(lts->pfile, file); + } + /* Don't allocate more for read buffer than could possibly help */ + lt->max_size = Min(MaxAllocSize, filesize); + tapeblocks = filesize / BLCKSZ; + + /* + * Update # of allocated blocks and # blocks written to reflect the + * imported BufFile. Allocated/written blocks include space used by holes + * left between concatenated BufFiles. Also track the number of hole + * blocks so that we can later work backwards to calculate the number of + * physical blocks for instrumentation. + */ + lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated; + + lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks; + lts->nBlocksWritten = lts->nBlocksAllocated; + + return lt; +} + +/* + * Close a logical tape set and release all resources. + * + * NOTE: This doesn't close any of the tapes! You must close them + * first, or you can let them be destroyed along with the memory context. + */ +void +LogicalTapeSetClose(LogicalTapeSet *lts) +{ + BufFileClose(lts->pfile); pfree(lts->freeBlocks); pfree(lts); } +/* + * Create a logical tape in the given tapeset. + * + * The tape is initialized in write state. + */ +LogicalTape * +LogicalTapeCreate(LogicalTapeSet *lts) +{ + /* + * The only thing that currently prevents creating new tapes in leader is + * the fact that BufFiles opened using BufFileOpenShared() are read-only + * by definition, but that could be changed if it seemed worthwhile. For + * now, writing to the leader tape will raise a "Bad file descriptor" + * error, so tuplesort must avoid writing to the leader tape altogether. + */ + if (lts->fileset && lts->worker == -1) + elog(ERROR, "cannot create new tapes in leader process"); + + return ltsCreateTape(lts); +} + +static LogicalTape * +ltsCreateTape(LogicalTapeSet *lts) +{ + LogicalTape *lt; + + /* + * Create per-tape struct. Note we allocate the I/O buffer lazily. + */ + lt = palloc(sizeof(LogicalTape)); + lt->tapeSet = lts; + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; + lt->prealloc = NULL; + lt->nprealloc = 0; + lt->prealloc_size = 0; + + return lt; +} + +/* + * Close a logical tape. + * + * Note: This doesn't return any blocks to the free list! You must read + * the tape to the end first, to reuse the space. In current use, though, + * we only close tapes after fully reading them. + */ +void +LogicalTapeClose(LogicalTape *lt) +{ + if (lt->buffer) + pfree(lt->buffer); + pfree(lt); +} + /* * Mark a logical tape set as not needing management of free space anymore. * @@ -772,14 +767,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts) * There are no error returns; we ereport() on failure. */ void -LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, - void *ptr, size_t size) +LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size) { - LogicalTape *lt; + LogicalTapeSet *lts = lt->tapeSet; size_t nthistime; - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; Assert(lt->writing); Assert(lt->offsetBlockNumber == 0L); @@ -818,11 +810,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, * First allocate the next block, so that we can store it in the * 'next' pointer of this block. */ - nextBlockNumber = ltsGetBlock(lts, lt); + nextBlockNumber = ltsGetBlock(lt->tapeSet, lt); /* set the next-pointer and dump the current block. */ TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber; - ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer); + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); /* initialize the prev-pointer of the next block */ TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber; @@ -860,12 +852,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, * byte buffer is used. */ void -LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) +LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size) { - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; + LogicalTapeSet *lts = lt->tapeSet; /* * Round and cap buffer_size if needed. @@ -907,7 +896,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) lt->buffer_size - lt->nbytes); TapeBlockSetNBytes(lt->buffer, lt->nbytes); - ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer); + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); } lt->writing = false; } @@ -939,61 +928,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) } } -/* - * Rewind logical tape and switch from reading to writing. - * - * NOTE: we assume the caller has read the tape to the end; otherwise - * untouched data will not have been freed. We could add more code to free - * any unread blocks, but in current usage of this module it'd be useless - * code. - */ -void -LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum) -{ - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; - - Assert(!lt->writing && !lt->frozen); - lt->writing = true; - lt->dirty = false; - lt->firstBlockNumber = -1L; - lt->curBlockNumber = -1L; - lt->pos = 0; - lt->nbytes = 0; - if (lt->buffer) - pfree(lt->buffer); - lt->buffer = NULL; - lt->buffer_size = 0; -} - /* * Read from a logical tape. * * Early EOF is indicated by return value less than #bytes requested. */ size_t -LogicalTapeRead(LogicalTapeSet *lts, int tapenum, - void *ptr, size_t size) +LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size) { - LogicalTape *lt; size_t nread = 0; size_t nthistime; - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; Assert(!lt->writing); if (lt->buffer == NULL) - ltsInitReadBuffer(lts, lt); + ltsInitReadBuffer(lt); while (size > 0) { if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ - if (!ltsReadFillBuffer(lts, lt)) + if (!ltsReadFillBuffer(lt)) break; /* EOF */ } @@ -1031,12 +987,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, * Serial sorts should set share to NULL. */ void -LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) +LogicalTapeFreeze(LogicalTape *lt, TapeShare *share) { - LogicalTape *lt; + LogicalTapeSet *lts = lt->tapeSet; - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; Assert(lt->writing); Assert(lt->offsetBlockNumber == 0L); @@ -1058,8 +1012,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) lt->buffer_size - lt->nbytes); TapeBlockSetNBytes(lt->buffer, lt->nbytes); - ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer); - lt->writing = false; + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); } lt->writing = false; lt->frozen = true; @@ -1086,7 +1039,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) if (lt->firstBlockNumber == -1L) lt->nextBlockNumber = -1L; - ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer); + ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); if (TapeBlockIsLast(lt->buffer)) lt->nextBlockNumber = -1L; else @@ -1101,25 +1054,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) } } -/* - * Add additional tapes to this tape set. Not intended to be used when any - * tapes are frozen. - */ -void -LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional) -{ - int i; - int nTapesOrig = lts->nTapes; - - lts->nTapes += nAdditional; - - lts->tapes = (LogicalTape *) repalloc(lts->tapes, - lts->nTapes * sizeof(LogicalTape)); - - for (i = nTapesOrig; i < lts->nTapes; i++) - ltsInitTape(<s->tapes[i]); -} - /* * Backspace the tape a given number of bytes. (We also support a more * general seek interface, see below.) @@ -1134,18 +1068,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional) * that case. */ size_t -LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) +LogicalTapeBackspace(LogicalTape *lt, size_t size) { - LogicalTape *lt; size_t seekpos = 0; - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; Assert(lt->frozen); Assert(lt->buffer_size == BLCKSZ); if (lt->buffer == NULL) - ltsInitReadBuffer(lts, lt); + ltsInitReadBuffer(lt); /* * Easy case for seek within current block. @@ -1175,7 +1106,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) return seekpos; } - ltsReadBlock(lts, prev, (void *) lt->buffer); + ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer); if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber) elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld", @@ -1208,23 +1139,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) * LogicalTapeTell(). */ void -LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, - long blocknum, int offset) +LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset) { - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; Assert(lt->frozen); Assert(offset >= 0 && offset <= TapeBlockPayloadSize); Assert(lt->buffer_size == BLCKSZ); if (lt->buffer == NULL) - ltsInitReadBuffer(lts, lt); + ltsInitReadBuffer(lt); if (blocknum != lt->curBlockNumber) { - ltsReadBlock(lts, blocknum, (void *) lt->buffer); + ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer); lt->curBlockNumber = blocknum; lt->nbytes = TapeBlockPayloadSize; lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next; @@ -1242,16 +1168,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, * the position for a seek after freezing. Not clear if anyone needs that. */ void -LogicalTapeTell(LogicalTapeSet *lts, int tapenum, - long *blocknum, int *offset) +LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset) { - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; - if (lt->buffer == NULL) - ltsInitReadBuffer(lts, lt); + ltsInitReadBuffer(lt); Assert(lt->offsetBlockNumber == 0L); @@ -1271,13 +1191,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long LogicalTapeSetBlocks(LogicalTapeSet *lts) { -#ifdef USE_ASSERT_CHECKING - for (int i = 0; i < lts->nTapes; i++) - { - LogicalTape *lt = <s->tapes[i]; - - Assert(!lt->writing || lt->buffer == NULL); - } -#endif return lts->nBlocksWritten - lts->nHoleBlocks; } diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index b17347b214..d5930f258d 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -262,6 +262,7 @@ struct Tuplesortstate MemoryContext sortcontext; /* memory context holding most sort data */ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ + LogicalTape **tapes; /* * These function pointers decouple the routines that must know what kind @@ -290,7 +291,7 @@ struct Tuplesortstate * SortTuple struct!), and increase state->availMem by the amount of * memory space thereby released. */ - void (*writetup) (Tuplesortstate *state, int tapenum, + void (*writetup) (Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); /* @@ -299,7 +300,7 @@ struct Tuplesortstate * from the slab memory arena, or is palloc'd, see readtup_alloc(). */ void (*readtup) (Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); /* * This array holds the tuples now in sort memory. If we are in state @@ -393,7 +394,7 @@ struct Tuplesortstate * the next tuple to return. (In the tape case, the tape's current read * position is also critical state.) */ - int result_tape; /* actual tape number of finished output */ + LogicalTape *result_tape; /* tape of finished output */ int current; /* array index (only used if SORTEDINMEM) */ bool eof_reached; /* reached EOF (needed for cursors) */ @@ -599,9 +600,9 @@ struct Sharedsort */ /* When using this macro, beware of double evaluation of len */ -#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ +#define LogicalTapeReadExact(tape, ptr, len) \ do { \ - if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ + if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \ elog(ERROR, "unexpected end of data"); \ } while(0) @@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); -static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); +static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup); static void dumptuples(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); @@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_delete_top(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state); -static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); -static void markrunend(Tuplesortstate *state, int tapenum); +static unsigned int getlen(LogicalTape *tape, bool eofOK); +static void markrunend(LogicalTape *tape); static void *readtup_alloc(Tuplesortstate *state, Size tuplen); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_heap(Tuplesortstate *state, int tapenum, +static void writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_cluster(Tuplesortstate *state, int tapenum, +static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_index(Tuplesortstate *state, int tapenum, +static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_datum(Tuplesortstate *state, int tapenum, +static void writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int worker_get_identifier(Tuplesortstate *state); static void worker_freeze_result_tape(Tuplesortstate *state); static void worker_nomergeruns(Tuplesortstate *state); @@ -888,7 +889,7 @@ tuplesort_begin_batch(Tuplesortstate *state) * inittapes(), if needed */ - state->result_tape = -1; /* flag that result tape has not been formed */ + state->result_tape = NULL; /* flag that result tape has not been formed */ MemoryContextSwitchTo(oldcontext); } @@ -2221,7 +2222,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, if (state->eof_reached) return false; - if ((tuplen = getlen(state, state->result_tape, true)) != 0) + if ((tuplen = getlen(state->result_tape, true)) != 0) { READTUP(state, stup, state->result_tape, tuplen); @@ -2254,8 +2255,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * end of file; back up to fetch last tuple's ending length * word. If seek fails we must have a completely empty file. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, 2 * sizeof(unsigned int)); if (nmoved == 0) return false; @@ -2269,20 +2269,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * Back up and fetch previously-returned tuple's ending length * word. If seek fails, assume we are at start of file. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, sizeof(unsigned int)); if (nmoved == 0) return false; else if (nmoved != sizeof(unsigned int)) elog(ERROR, "unexpected tape position"); - tuplen = getlen(state, state->result_tape, false); + tuplen = getlen(state->result_tape, false); /* * Back up to get ending length word of tuple before it. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, tuplen + 2 * sizeof(unsigned int)); if (nmoved == tuplen + sizeof(unsigned int)) { @@ -2299,15 +2297,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, elog(ERROR, "bogus tuple length in backward scan"); } - tuplen = getlen(state, state->result_tape, false); + tuplen = getlen(state->result_tape, false); /* * Now we have the length of the prior tuple, back up and read it. * Note: READTUP expects we are positioned after the initial * length word of the tuple, so back up to that point. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, tuplen); if (nmoved != tuplen) elog(ERROR, "bogus tuple length in backward scan"); @@ -2365,11 +2362,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, tuplesort_heap_delete_top(state); /* - * Rewind to free the read buffer. It'd go away at the - * end of the sort anyway, but better to release the - * memory early. + * Close the tape. It'd go away at the end of the sort + * anyway, but better to release the memory early. */ - LogicalTapeRewindForWrite(state->tapeset, srcTape); + LogicalTapeClose(state->tapes[srcTape]); return true; } newtup.srctape = srcTape; @@ -2667,9 +2663,12 @@ inittapes(Tuplesortstate *state, bool mergeruns) /* Create the tape set and allocate the per-tape data arrays */ inittapestate(state, maxTapes); state->tapeset = - LogicalTapeSetCreate(maxTapes, false, NULL, + LogicalTapeSetCreate(false, state->shared ? &state->shared->fileset : NULL, state->worker); + state->tapes = palloc(maxTapes * sizeof(LogicalTape *)); + for (j = 0; j < maxTapes; j++) + state->tapes[j] = LogicalTapeCreate(state->tapeset); state->currentRun = 0; @@ -2919,7 +2918,7 @@ mergeruns(Tuplesortstate *state) /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); + LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size); for (;;) { @@ -2981,11 +2980,14 @@ mergeruns(Tuplesortstate *state) /* Step D6: decrease level */ if (--state->Level == 0) break; + /* rewind output tape T to use as new input */ - LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], + LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]], state->read_buffer_size); - /* rewind used-up input tape P, and prepare it for write pass */ - LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); + + /* close used-up input tape P, and create a new one for write pass */ + LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]); + state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset); state->tp_runs[state->tapeRange - 1] = 0; /* @@ -3013,18 +3015,21 @@ mergeruns(Tuplesortstate *state) * output tape while rewinding it. The last iteration of step D6 would be * a waste of cycles anyway... */ - state->result_tape = state->tp_tapenum[state->tapeRange]; + state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]]; if (!WORKER(state)) - LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); + LogicalTapeFreeze(state->result_tape, NULL); else worker_freeze_result_tape(state); state->status = TSS_SORTEDONTAPE; - /* Release the read buffers of all the other tapes, by rewinding them. */ + /* Close all the other tapes, to release their read buffers. */ for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { - if (tapenum != state->result_tape) - LogicalTapeRewindForWrite(state->tapeset, tapenum); + if (state->tapes[tapenum] != state->result_tape) + { + LogicalTapeClose(state->tapes[tapenum]); + state->tapes[tapenum] = NULL; + } } } @@ -3037,7 +3042,8 @@ mergeruns(Tuplesortstate *state) static void mergeonerun(Tuplesortstate *state) { - int destTape = state->tp_tapenum[state->tapeRange]; + int destTapeNum = state->tp_tapenum[state->tapeRange]; + LogicalTape *destTape = state->tapes[destTapeNum]; int srcTape; /* @@ -3080,7 +3086,7 @@ mergeonerun(Tuplesortstate *state) * When the heap empties, we're done. Write an end-of-run marker on the * output tape, and increment its count of real runs. */ - markrunend(state, destTape); + markrunend(destTape); state->tp_runs[state->tapeRange]++; #ifdef TRACE_SORT @@ -3146,17 +3152,18 @@ beginmerge(Tuplesortstate *state) * Returns false on EOF. */ static bool -mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) +mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup) { + LogicalTape *srcTape = state->tapes[srcTapeIndex]; unsigned int tuplen; - if (!state->mergeactive[srcTape]) + if (!state->mergeactive[srcTapeIndex]) return false; /* tape's run is already exhausted */ /* read next tuple, if any */ - if ((tuplen = getlen(state, srcTape, true)) == 0) + if ((tuplen = getlen(srcTape, true)) == 0) { - state->mergeactive[srcTape] = false; + state->mergeactive[srcTapeIndex] = false; return false; } READTUP(state, stup, srcTape, tuplen); @@ -3173,6 +3180,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) static void dumptuples(Tuplesortstate *state, bool alltuples) { + LogicalTape *destTape; int memtupwrite; int i; @@ -3239,10 +3247,10 @@ dumptuples(Tuplesortstate *state, bool alltuples) #endif memtupwrite = state->memtupcount; + destTape = state->tapes[state->tp_tapenum[state->destTape]]; for (i = 0; i < memtupwrite; i++) { - WRITETUP(state, state->tp_tapenum[state->destTape], - &state->memtuples[i]); + WRITETUP(state, destTape, &state->memtuples[i]); state->memtupcount--; } @@ -3255,7 +3263,7 @@ dumptuples(Tuplesortstate *state, bool alltuples) */ MemoryContextReset(state->tuplecontext); - markrunend(state, state->tp_tapenum[state->destTape]); + markrunend(destTape); state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ @@ -3289,9 +3297,7 @@ tuplesort_rescan(Tuplesortstate *state) state->markpos_eof = false; break; case TSS_SORTEDONTAPE: - LogicalTapeRewindForRead(state->tapeset, - state->result_tape, - 0); + LogicalTapeRewindForRead(state->result_tape, 0); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; @@ -3322,8 +3328,7 @@ tuplesort_markpos(Tuplesortstate *state) state->markpos_eof = state->eof_reached; break; case TSS_SORTEDONTAPE: - LogicalTapeTell(state->tapeset, - state->result_tape, + LogicalTapeTell(state->result_tape, &state->markpos_block, &state->markpos_offset); state->markpos_eof = state->eof_reached; @@ -3354,8 +3359,7 @@ tuplesort_restorepos(Tuplesortstate *state) state->eof_reached = state->markpos_eof; break; case TSS_SORTEDONTAPE: - LogicalTapeSeek(state->tapeset, - state->result_tape, + LogicalTapeSeek(state->result_tape, state->markpos_block, state->markpos_offset); state->eof_reached = state->markpos_eof; @@ -3697,11 +3701,11 @@ reversedirection(Tuplesortstate *state) */ static unsigned int -getlen(Tuplesortstate *state, int tapenum, bool eofOK) +getlen(LogicalTape *tape, bool eofOK) { unsigned int len; - if (LogicalTapeRead(state->tapeset, tapenum, + if (LogicalTapeRead(tape, &len, sizeof(len)) != sizeof(len)) elog(ERROR, "unexpected end of tape"); if (len == 0 && !eofOK) @@ -3710,11 +3714,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK) } static void -markrunend(Tuplesortstate *state, int tapenum) +markrunend(LogicalTape *tape) { unsigned int len = 0; - LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); + LogicalTapeWrite(tape, (void *) &len, sizeof(len)); } /* @@ -3892,7 +3896,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { MinimalTuple tuple = (MinimalTuple) stup->tuple; @@ -3903,13 +3907,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) /* total on-disk footprint: */ unsigned int tuplen = tupbodylen + sizeof(int); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) tupbody, tupbodylen); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -3920,7 +3921,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_heap(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; @@ -3930,11 +3931,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, /* read in the tuple proper */ tuple->t_len = tuplen; - LogicalTapeReadExact(state->tapeset, tapenum, - tupbody, tupbodylen); + LogicalTapeReadExact(tape, tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; @@ -4135,21 +4134,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { HeapTuple tuple = (HeapTuple) stup->tuple; unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); /* We need to store t_self, but not other fields of HeapTupleData */ - LogicalTapeWrite(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - &tuple->t_self, sizeof(ItemPointerData)); - LogicalTapeWrite(state->tapeset, tapenum, - tuple->t_data, tuple->t_len); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData)); + LogicalTapeWrite(tape, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -4160,7 +4155,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int tuplen) + LogicalTape *tape, unsigned int tuplen) { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); HeapTuple tuple = (HeapTuple) readtup_alloc(state, @@ -4169,16 +4164,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, /* Reconstruct the HeapTupleData header */ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); tuple->t_len = t_len; - LogicalTapeReadExact(state->tapeset, tapenum, - &tuple->t_self, sizeof(ItemPointerData)); + LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData)); /* We don't currently bother to reconstruct t_tableOid */ tuple->t_tableOid = InvalidOid; /* Read in the tuple body */ - LogicalTapeReadExact(state->tapeset, tapenum, - tuple->t_data, tuple->t_len); + LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value, if it's a simple column */ if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) @@ -4392,19 +4384,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { IndexTuple tuple = (IndexTuple) stup->tuple; unsigned int tuplen; tuplen = IndexTupleSize(tuple) + sizeof(tuplen); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) tuple, IndexTupleSize(tuple)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple)); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -4415,16 +4404,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_index(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); - LogicalTapeReadExact(state->tapeset, tapenum, - tuple, tuplen); + LogicalTapeReadExact(tape, tuple, tuplen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ stup->datum1 = index_getattr(tuple, @@ -4466,7 +4453,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { void *waddr; unsigned int tuplen; @@ -4491,13 +4478,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) writtenlen = tuplen + sizeof(unsigned int); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &writtenlen, sizeof(writtenlen)); - LogicalTapeWrite(state->tapeset, tapenum, - waddr, tuplen); + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); + LogicalTapeWrite(tape, waddr, tuplen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &writtenlen, sizeof(writtenlen)); + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); if (!state->slabAllocatorUsed && stup->tuple) { @@ -4508,7 +4492,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_datum(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); @@ -4522,8 +4506,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, else if (!state->tuples) { Assert(tuplen == sizeof(Datum)); - LogicalTapeReadExact(state->tapeset, tapenum, - &stup->datum1, tuplen); + LogicalTapeReadExact(tape, &stup->datum1, tuplen); stup->isnull1 = false; stup->tuple = NULL; } @@ -4531,16 +4514,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, { void *raddr = readtup_alloc(state, tuplen); - LogicalTapeReadExact(state->tapeset, tapenum, - raddr, tuplen); + LogicalTapeReadExact(tape, raddr, tuplen); stup->datum1 = PointerGetDatum(raddr); stup->isnull1 = false; stup->tuple = raddr; } if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); } /* @@ -4652,7 +4633,7 @@ worker_freeze_result_tape(Tuplesortstate *state) TapeShare output; Assert(WORKER(state)); - Assert(state->result_tape != -1); + Assert(state->result_tape != NULL); Assert(state->memtupcount == 0); /* @@ -4668,7 +4649,7 @@ worker_freeze_result_tape(Tuplesortstate *state) * Parallel worker requires result tape metadata, which is to be stored in * shared memory for leader */ - LogicalTapeFreeze(state->tapeset, state->result_tape, &output); + LogicalTapeFreeze(state->result_tape, &output); /* Store properties of output tape, and update finished worker count */ SpinLockAcquire(&shared->mutex); @@ -4687,9 +4668,9 @@ static void worker_nomergeruns(Tuplesortstate *state) { Assert(WORKER(state)); - Assert(state->result_tape == -1); + Assert(state->result_tape == NULL); - state->result_tape = state->tp_tapenum[state->destTape]; + state->result_tape = state->tapes[state->tp_tapenum[state->destTape]]; worker_freeze_result_tape(state); } @@ -4733,9 +4714,13 @@ leader_takeover_tapes(Tuplesortstate *state) * randomAccess is disallowed for parallel sorts. */ inittapestate(state, nParticipants + 1); - state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false, - shared->tapes, &shared->fileset, + state->tapeset = LogicalTapeSetCreate(false, + &shared->fileset, state->worker); + state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *)); + for (j = 0; j < nParticipants; j++) + state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]); + /* tapes[nParticipants] represents the "leader tape", which is not used */ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ state->currentRun = nParticipants; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 37cb4f3d59..2e8cbee69f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -41,6 +41,7 @@ struct ExprContext; struct RangeTblEntry; /* avoid including parsenodes.h here */ struct ExprEvalStep; /* avoid including execExpr.h everywhere */ struct CopyMultiInsertBuffer; +struct LogicalTapeSet; /* ---------------- @@ -2316,7 +2317,7 @@ typedef struct AggState bool table_filled; /* hash table filled yet? */ int num_hashes; MemoryContext hash_metacxt; /* memory for hash table itself */ - struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */ + struct LogicalTapeSet *hash_tapeset; /* tape set for hash spill tapes */ struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set, * exists only during first pass */ TupleTableSlot *hash_spill_rslot; /* for reading spill files */ diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 85d2e03c63..758a19779c 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -18,9 +18,13 @@ #include "storage/sharedfileset.h" -/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */ - +/* + * LogicalTapeSet and LogicalTape are opaque types whose details are not + * known outside logtape.c. + */ typedef struct LogicalTapeSet LogicalTapeSet; +typedef struct LogicalTape LogicalTape; + /* * The approach tuplesort.c takes to parallel external sorts is that workers, @@ -54,27 +58,20 @@ typedef struct TapeShare * prototypes for functions in logtape.c */ -extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate, - TapeShare *shared, +extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker); +extern void LogicalTapeClose(LogicalTape *lt); extern void LogicalTapeSetClose(LogicalTapeSet *lts); +extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts); +extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); -extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, - void *ptr, size_t size); -extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, - void *ptr, size_t size); -extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, - size_t buffer_size); -extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); -extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, - TapeShare *share); -extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional); -extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, - size_t size); -extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, - long blocknum, int offset); -extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, - long *blocknum, int *offset); +extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size); +extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size); +extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size); +extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share); +extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size); +extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset); +extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); #endif /* LOGTAPE_H */ -- 2.30.2