Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * logtape.c
4 : * Management of "logical tapes" within temporary files.
5 : *
6 : * This module exists to support sorting via multiple merge passes (see
7 : * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 : * we implement it on disk by creating a separate file for each "tape",
9 : * there is an annoying problem: the peak space usage is at least twice
10 : * the volume of actual data to be sorted. (This must be so because each
11 : * datum will appear in both the input and output tapes of the final
12 : * merge pass. For seven-tape polyphase merge, which is otherwise a
13 : * pretty good algorithm, peak usage is more like 4x actual data volume.)
14 : *
15 : * We can work around this problem by recognizing that any one tape
16 : * dataset (with the possible exception of the final output) is written
17 : * and read exactly once in a perfectly sequential manner. Therefore,
18 : * a datum once read will not be required again, and we can recycle its
19 : * space for use by the new tape dataset(s) being generated. In this way,
20 : * the total space usage is essentially just the actual data volume, plus
21 : * insignificant bookkeeping and start/stop overhead.
22 : *
23 : * Few OSes allow arbitrary parts of a file to be released back to the OS,
24 : * so we have to implement this space-recycling ourselves within a single
25 : * logical file. logtape.c exists to perform this bookkeeping and provide
26 : * the illusion of N independent tape devices to tuplesort.c. Note that
27 : * logtape.c itself depends on buffile.c to provide a "logical file" of
28 : * larger size than the underlying OS may support.
29 : *
30 : * For simplicity, we allocate and release space in the underlying file
31 : * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32 : * of which blocks in the underlying file belong to which logical tape,
33 : * plus any blocks that are free (recycled and not yet reused).
34 : * The blocks in each logical tape form a chain, with a prev- and next-
35 : * pointer in each block.
36 : *
37 : * The initial write pass is guaranteed to fill the underlying file
38 : * perfectly sequentially, no matter how data is divided into logical tapes.
39 : * Once we begin merge passes, the access pattern becomes considerably
40 : * less predictable --- but the seeking involved should be comparable to
41 : * what would happen if we kept each logical tape in a separate file,
42 : * so there's no serious performance penalty paid to obtain the space
43 : * savings of recycling. We try to localize the write accesses by always
44 : * writing to the lowest-numbered free block when we have a choice; it's
45 : * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46 : * policy for free blocks would be better?)
47 : *
48 : * To further make the I/Os more sequential, we can use a larger buffer
49 : * when reading, and read multiple blocks from the same tape in one go,
50 : * whenever the buffer becomes empty. LogicalTapeAssignReadBufferSize()
51 : * can be used to set the size of the read buffer.
52 : *
53 : * To support the above policy of writing to the lowest free block,
54 : * ltsGetFreeBlock sorts the list of free block numbers into decreasing
55 : * order each time it is asked for a block and the list isn't currently
56 : * sorted. This is an efficient way to handle it because we expect cycles
57 : * of releasing many blocks followed by re-using many blocks, due to
58 : * the larger read buffer.
59 : *
60 : * Since all the bookkeeping and buffer memory is allocated with palloc(),
61 : * and the underlying file(s) are made with OpenTemporaryFile, all resources
62 : * for a logical tape set are certain to be cleaned up even if processing
63 : * is aborted by ereport(ERROR). To avoid confusion, the caller should take
64 : * care that all calls for a single LogicalTapeSet are made in the same
65 : * palloc context.
66 : *
67 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
68 : * Portions Copyright (c) 1994, Regents of the University of California
69 : *
70 : * IDENTIFICATION
71 : * src/backend/utils/sort/logtape.c
72 : *
73 : *-------------------------------------------------------------------------
74 : */
75 :
76 : #include "postgres.h"
77 :
78 : #include "storage/buffile.h"
79 : #include "utils/logtape.h"
80 : #include "utils/memutils.h"
81 :
82 : /*
83 : * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
84 : *
85 : * The first block of a tape has prev == -1. The last block of a tape
86 : * stores the number of valid bytes on the block, inverted, in 'next'
87 : * Therefore next < 0 indicates the last block.
88 : */
89 : typedef struct TapeBlockTrailer
90 : {
91 : long prev; /* previous block on this tape, or -1 on first
92 : * block */
93 : long next; /* next block on this tape, or # of valid
94 : * bytes on last block (if < 0) */
95 : } TapeBlockTrailer;
96 :
97 : #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
98 : #define TapeBlockGetTrailer(buf) \
99 : ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
100 :
101 : #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
102 : #define TapeBlockGetNBytes(buf) \
103 : (TapeBlockIsLast(buf) ? \
104 : (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
105 : #define TapeBlockSetNBytes(buf, nbytes) \
106 : (TapeBlockGetTrailer(buf)->next = -(nbytes))
107 :
108 :
109 : /*
110 : * This data structure represents a single "logical tape" within the set
111 : * of logical tapes stored in the same file.
112 : *
113 : * While writing, we hold the current partially-written data block in the
114 : * buffer. While reading, we can hold multiple blocks in the buffer. Note
115 : * that we don't retain the trailers of a block when it's read into the
116 : * buffer. The buffer therefore contains one large contiguous chunk of data
117 : * from the tape.
118 : */
119 : typedef struct LogicalTape
120 : {
121 : bool writing; /* T while in write phase */
122 : bool frozen; /* T if blocks should not be freed when read */
123 : bool dirty; /* does buffer need to be written? */
124 :
125 : /*
126 : * Block numbers of the first, current, and next block of the tape.
127 : *
128 : * The "current" block number is only valid when writing, or reading from
129 : * a frozen tape. (When reading from an unfrozen tape, we use a larger
130 : * read buffer that holds multiple blocks, so the "current" block is
131 : * ambiguous.)
132 : */
133 : long firstBlockNumber;
134 : long curBlockNumber;
135 : long nextBlockNumber;
136 :
137 : /*
138 : * Buffer for current data block(s).
139 : */
140 : char *buffer; /* physical buffer (separately palloc'd) */
141 : int buffer_size; /* allocated size of the buffer */
142 : int pos; /* next read/write position in buffer */
143 : int nbytes; /* total # of valid bytes in buffer */
144 : } LogicalTape;
145 :
146 : /*
147 : * This data structure represents a set of related "logical tapes" sharing
148 : * space in a single underlying file. (But that "file" may be multiple files
149 : * if needed to escape OS limits on file size; buffile.c handles that for us.)
150 : * The number of tapes is fixed at creation.
151 : */
152 : struct LogicalTapeSet
153 : {
154 : BufFile *pfile; /* underlying file for whole tape set */
155 :
156 : /*
157 : * File size tracking. nBlocksWritten is the size of the underlying file,
158 : * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
159 : * by ltsGetFreeBlock(), and it is always greater than or equal to
160 : * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
161 : * blocks that have been allocated for a tape, but have not been written
162 : * to the underlying file yet.
163 : */
164 : long nBlocksAllocated; /* # of blocks allocated */
165 : long nBlocksWritten; /* # of blocks used in underlying file */
166 :
167 : /*
168 : * We store the numbers of recycled-and-available blocks in freeBlocks[].
169 : * When there are no such blocks, we extend the underlying file.
170 : *
171 : * If forgetFreeSpace is true then any freed blocks are simply forgotten
172 : * rather than being remembered in freeBlocks[]. See notes for
173 : * LogicalTapeSetForgetFreeSpace().
174 : *
175 : * If blocksSorted is true then the block numbers in freeBlocks are in
176 : * *decreasing* order, so that removing the last entry gives us the lowest
177 : * free block. We re-sort the blocks whenever a block is demanded; this
178 : * should be reasonably efficient given the expected usage pattern.
179 : */
180 : bool forgetFreeSpace; /* are we remembering free blocks? */
181 : bool blocksSorted; /* is freeBlocks[] currently in order? */
182 : long *freeBlocks; /* resizable array */
183 : int nFreeBlocks; /* # of currently free blocks */
184 : int freeBlocksLen; /* current allocated length of freeBlocks[] */
185 :
186 : /* The array of logical tapes. */
187 : int nTapes; /* # of logical tapes in set */
188 : LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
189 : };
190 :
191 : static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
192 : static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
193 : static long ltsGetFreeBlock(LogicalTapeSet *lts);
194 : static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
195 :
196 :
197 : /*
198 : * Write a block-sized buffer to the specified block of the underlying file.
199 : *
200 : * No need for an error return convention; we ereport() on any error.
201 : */
202 : static void
203 688 : ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
204 : {
205 : /*
206 : * BufFile does not support "holes", so if we're about to write a block
207 : * that's past the current end of file, fill the space between the current
208 : * end of file and the target block with zeros.
209 : *
210 : * This should happen rarely, otherwise you are not writing very
211 : * sequentially. In current use, this only happens when the sort ends
212 : * writing a run, and switches to another tape. The last block of the
213 : * previous tape isn't flushed to disk until the end of the sort, so you
214 : * get one-block hole, where the last block of the previous tape will
215 : * later go.
216 : */
217 1381 : while (blocknum > lts->nBlocksWritten)
218 : {
219 : char zerobuf[BLCKSZ];
220 :
221 5 : MemSet(zerobuf, 0, sizeof(zerobuf));
222 :
223 5 : ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf);
224 : }
225 :
226 : /* Write the requested block */
227 1376 : if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
228 688 : BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
229 0 : ereport(ERROR,
230 : (errcode_for_file_access(),
231 : errmsg("could not write block %ld of temporary file: %m",
232 : blocknum)));
233 :
234 : /* Update nBlocksWritten, if we extended the file */
235 688 : if (blocknum == lts->nBlocksWritten)
236 683 : lts->nBlocksWritten++;
237 688 : }
238 :
239 : /*
240 : * Read a block-sized buffer from the specified block of the underlying file.
241 : *
242 : * No need for an error return convention; we ereport() on any error. This
243 : * module should never attempt to read a block it doesn't know is there.
244 : */
245 : static void
246 683 : ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
247 : {
248 1366 : if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
249 683 : BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
250 0 : ereport(ERROR,
251 : (errcode_for_file_access(),
252 : errmsg("could not read block %ld of temporary file: %m",
253 : blocknum)));
254 683 : }
255 :
256 : /*
257 : * Read as many blocks as we can into the per-tape buffer.
258 : *
259 : * Returns true if anything was read, 'false' on EOF.
260 : */
261 : static bool
262 361 : ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
263 : {
264 361 : lt->pos = 0;
265 361 : lt->nbytes = 0;
266 :
267 : do
268 : {
269 682 : char *thisbuf = lt->buffer + lt->nbytes;
270 :
271 : /* Fetch next block number */
272 682 : if (lt->nextBlockNumber == -1L)
273 0 : break; /* EOF */
274 :
275 : /* Read the block */
276 682 : ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
277 682 : if (!lt->frozen)
278 343 : ltsReleaseBlock(lts, lt->nextBlockNumber);
279 682 : lt->curBlockNumber = lt->nextBlockNumber;
280 :
281 682 : lt->nbytes += TapeBlockGetNBytes(thisbuf);
282 682 : if (TapeBlockIsLast(thisbuf))
283 : {
284 7 : lt->nextBlockNumber = -1L;
285 : /* EOF */
286 7 : break;
287 : }
288 : else
289 675 : lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
290 :
291 : /* Advance to next block, if we have buffer space left */
292 675 : } while (lt->buffer_size - lt->nbytes > BLCKSZ);
293 :
294 361 : return (lt->nbytes > 0);
295 : }
296 :
297 : /*
298 : * qsort comparator for sorting freeBlocks[] into decreasing order.
299 : */
300 : static int
301 0 : freeBlocks_cmp(const void *a, const void *b)
302 : {
303 0 : long ablk = *((const long *) a);
304 0 : long bblk = *((const long *) b);
305 :
306 : /* can't just subtract because long might be wider than int */
307 0 : if (ablk < bblk)
308 0 : return 1;
309 0 : if (ablk > bblk)
310 0 : return -1;
311 0 : return 0;
312 : }
313 :
314 : /*
315 : * Select a currently unused block for writing to.
316 : */
317 : static long
318 683 : ltsGetFreeBlock(LogicalTapeSet *lts)
319 : {
320 : /*
321 : * If there are multiple free blocks, we select the one appearing last in
322 : * freeBlocks[] (after sorting the array if needed). If there are none,
323 : * assign the next block at the end of the file.
324 : */
325 683 : if (lts->nFreeBlocks > 0)
326 : {
327 0 : if (!lts->blocksSorted)
328 : {
329 0 : qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
330 : sizeof(long), freeBlocks_cmp);
331 0 : lts->blocksSorted = true;
332 : }
333 0 : return lts->freeBlocks[--lts->nFreeBlocks];
334 : }
335 : else
336 683 : return lts->nBlocksAllocated++;
337 : }
338 :
339 : /*
340 : * Return a block# to the freelist.
341 : */
342 : static void
343 343 : ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
344 : {
345 : int ndx;
346 :
347 : /*
348 : * Do nothing if we're no longer interested in remembering free space.
349 : */
350 343 : if (lts->forgetFreeSpace)
351 566 : return;
352 :
353 : /*
354 : * Enlarge freeBlocks array if full.
355 : */
356 120 : if (lts->nFreeBlocks >= lts->freeBlocksLen)
357 : {
358 2 : lts->freeBlocksLen *= 2;
359 2 : lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
360 2 : lts->freeBlocksLen * sizeof(long));
361 : }
362 :
363 : /*
364 : * Add blocknum to array, and mark the array unsorted if it's no longer in
365 : * decreasing order.
366 : */
367 120 : ndx = lts->nFreeBlocks++;
368 120 : lts->freeBlocks[ndx] = blocknum;
369 120 : if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
370 119 : lts->blocksSorted = false;
371 : }
372 :
373 : /*
374 : * Create a set of logical tapes in a temporary underlying file.
375 : *
376 : * Each tape is initialized in write state.
377 : */
378 : LogicalTapeSet *
379 2 : LogicalTapeSetCreate(int ntapes)
380 : {
381 : LogicalTapeSet *lts;
382 : LogicalTape *lt;
383 : int i;
384 :
385 : /*
386 : * Create top-level struct including per-tape LogicalTape structs.
387 : */
388 2 : Assert(ntapes > 0);
389 2 : lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
390 : ntapes * sizeof(LogicalTape));
391 2 : lts->pfile = BufFileCreateTemp(false);
392 2 : lts->nBlocksAllocated = 0L;
393 2 : lts->nBlocksWritten = 0L;
394 2 : lts->forgetFreeSpace = false;
395 2 : lts->blocksSorted = true; /* a zero-length array is sorted ... */
396 2 : lts->freeBlocksLen = 32; /* reasonable initial guess */
397 2 : lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
398 2 : lts->nFreeBlocks = 0;
399 2 : lts->nTapes = ntapes;
400 :
401 : /*
402 : * Initialize per-tape structs. Note we allocate the I/O buffer and the
403 : * first block for a tape only when it is first actually written to. This
404 : * avoids wasting memory space when tuplesort.c overestimates the number
405 : * of tapes needed.
406 : */
407 16 : for (i = 0; i < ntapes; i++)
408 : {
409 14 : lt = <s->tapes[i];
410 14 : lt->writing = true;
411 14 : lt->frozen = false;
412 14 : lt->dirty = false;
413 14 : lt->firstBlockNumber = -1L;
414 14 : lt->curBlockNumber = -1L;
415 14 : lt->buffer = NULL;
416 14 : lt->buffer_size = 0;
417 14 : lt->pos = 0;
418 14 : lt->nbytes = 0;
419 : }
420 2 : return lts;
421 : }
422 :
423 : /*
424 : * Close a logical tape set and release all resources.
425 : */
426 : void
427 2 : LogicalTapeSetClose(LogicalTapeSet *lts)
428 : {
429 : LogicalTape *lt;
430 : int i;
431 :
432 2 : BufFileClose(lts->pfile);
433 16 : for (i = 0; i < lts->nTapes; i++)
434 : {
435 14 : lt = <s->tapes[i];
436 14 : if (lt->buffer)
437 1 : pfree(lt->buffer);
438 : }
439 2 : pfree(lts->freeBlocks);
440 2 : pfree(lts);
441 2 : }
442 :
443 : /*
444 : * Mark a logical tape set as not needing management of free space anymore.
445 : *
446 : * This should be called if the caller does not intend to write any more data
447 : * into the tape set, but is reading from un-frozen tapes. Since no more
448 : * writes are planned, remembering free blocks is no longer useful. Setting
449 : * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
450 : * is not designed to handle large numbers of free blocks.
451 : */
452 : void
453 1 : LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
454 : {
455 1 : lts->forgetFreeSpace = true;
456 1 : }
457 :
458 : /*
459 : * Write to a logical tape.
460 : *
461 : * There are no error returns; we ereport() on failure.
462 : */
463 : void
464 60007 : LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
465 : void *ptr, size_t size)
466 : {
467 : LogicalTape *lt;
468 : size_t nthistime;
469 :
470 60007 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
471 60007 : lt = <s->tapes[tapenum];
472 60007 : Assert(lt->writing);
473 :
474 : /* Allocate data buffer and first block on first write */
475 60007 : if (lt->buffer == NULL)
476 : {
477 7 : lt->buffer = (char *) palloc(BLCKSZ);
478 7 : lt->buffer_size = BLCKSZ;
479 : }
480 60007 : if (lt->curBlockNumber == -1)
481 : {
482 7 : Assert(lt->firstBlockNumber == -1);
483 7 : Assert(lt->pos == 0);
484 :
485 7 : lt->curBlockNumber = ltsGetFreeBlock(lts);
486 7 : lt->firstBlockNumber = lt->curBlockNumber;
487 :
488 7 : TapeBlockGetTrailer(lt->buffer)->prev = -1L;
489 : }
490 :
491 60007 : Assert(lt->buffer_size == BLCKSZ);
492 180685 : while (size > 0)
493 : {
494 60671 : if (lt->pos >= TapeBlockPayloadSize)
495 : {
496 : /* Buffer full, dump it out */
497 : long nextBlockNumber;
498 :
499 676 : if (!lt->dirty)
500 : {
501 : /* Hmm, went directly from reading to writing? */
502 0 : elog(ERROR, "invalid logtape state: should be dirty");
503 : }
504 :
505 : /*
506 : * First allocate the next block, so that we can store it in the
507 : * 'next' pointer of this block.
508 : */
509 676 : nextBlockNumber = ltsGetFreeBlock(lts);
510 :
511 : /* set the next-pointer and dump the current block. */
512 676 : TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
513 676 : ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
514 :
515 : /* initialize the prev-pointer of the next block */
516 676 : TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
517 676 : lt->curBlockNumber = nextBlockNumber;
518 676 : lt->pos = 0;
519 676 : lt->nbytes = 0;
520 : }
521 :
522 60671 : nthistime = TapeBlockPayloadSize - lt->pos;
523 60671 : if (nthistime > size)
524 59995 : nthistime = size;
525 60671 : Assert(nthistime > 0);
526 :
527 60671 : memcpy(lt->buffer + lt->pos, ptr, nthistime);
528 :
529 60671 : lt->dirty = true;
530 60671 : lt->pos += nthistime;
531 60671 : if (lt->nbytes < lt->pos)
532 60671 : lt->nbytes = lt->pos;
533 60671 : ptr = (void *) ((char *) ptr + nthistime);
534 60671 : size -= nthistime;
535 : }
536 60007 : }
537 :
538 : /*
539 : * Rewind logical tape and switch from writing to reading.
540 : *
541 : * The tape must currently be in writing state, or "frozen" in read state.
542 : *
543 : * 'buffer_size' specifies how much memory to use for the read buffer.
544 : * Regardless of the argument, the actual amount of memory used is between
545 : * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
546 : * rounded down and truncated to fit those constraints, if necessary. If the
547 : * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
548 : * byte buffer is used.
549 : */
550 : void
551 6 : LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
552 : {
553 : LogicalTape *lt;
554 :
555 6 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
556 6 : lt = <s->tapes[tapenum];
557 :
558 : /*
559 : * Round and cap buffer_size if needed.
560 : */
561 6 : if (lt->frozen)
562 0 : buffer_size = BLCKSZ;
563 : else
564 : {
565 : /* need at least one block */
566 6 : if (buffer_size < BLCKSZ)
567 0 : buffer_size = BLCKSZ;
568 :
569 : /*
570 : * palloc() larger than MaxAllocSize would fail (a multi-gigabyte
571 : * buffer is unlikely to be helpful, anyway)
572 : */
573 6 : if (buffer_size > MaxAllocSize)
574 0 : buffer_size = MaxAllocSize;
575 :
576 : /* round down to BLCKSZ boundary */
577 6 : buffer_size -= buffer_size % BLCKSZ;
578 : }
579 :
580 6 : if (lt->writing)
581 : {
582 : /*
583 : * Completion of a write phase. Flush last partial data block, and
584 : * rewind for normal (destructive) read.
585 : */
586 6 : if (lt->dirty)
587 : {
588 6 : TapeBlockSetNBytes(lt->buffer, lt->nbytes);
589 6 : ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
590 : }
591 6 : lt->writing = false;
592 : }
593 : else
594 : {
595 : /*
596 : * This is only OK if tape is frozen; we rewind for (another) read
597 : * pass.
598 : */
599 0 : Assert(lt->frozen);
600 : }
601 :
602 : /* Allocate a read buffer (unless the tape is empty) */
603 6 : if (lt->buffer)
604 6 : pfree(lt->buffer);
605 6 : lt->buffer = NULL;
606 6 : lt->buffer_size = 0;
607 6 : if (lt->firstBlockNumber != -1L)
608 : {
609 6 : lt->buffer = palloc(buffer_size);
610 6 : lt->buffer_size = buffer_size;
611 : }
612 :
613 : /* Read the first block, or reset if tape is empty */
614 6 : lt->nextBlockNumber = lt->firstBlockNumber;
615 6 : lt->pos = 0;
616 6 : lt->nbytes = 0;
617 6 : ltsReadFillBuffer(lts, lt);
618 6 : }
619 :
620 : /*
621 : * Rewind logical tape and switch from reading to writing.
622 : *
623 : * NOTE: we assume the caller has read the tape to the end; otherwise
624 : * untouched data will not have been freed. We could add more code to free
625 : * any unread blocks, but in current usage of this module it'd be useless
626 : * code.
627 : */
628 : void
629 6 : LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
630 : {
631 : LogicalTape *lt;
632 :
633 6 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
634 6 : lt = <s->tapes[tapenum];
635 :
636 6 : Assert(!lt->writing && !lt->frozen);
637 6 : lt->writing = true;
638 6 : lt->dirty = false;
639 6 : lt->firstBlockNumber = -1L;
640 6 : lt->curBlockNumber = -1L;
641 6 : lt->pos = 0;
642 6 : lt->nbytes = 0;
643 6 : if (lt->buffer)
644 6 : pfree(lt->buffer);
645 6 : lt->buffer = NULL;
646 6 : lt->buffer_size = 0;
647 6 : }
648 :
649 : /*
650 : * Read from a logical tape.
651 : *
652 : * Early EOF is indicated by return value less than #bytes requested.
653 : */
654 : size_t
655 60007 : LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
656 : void *ptr, size_t size)
657 : {
658 : LogicalTape *lt;
659 60007 : size_t nread = 0;
660 : size_t nthistime;
661 :
662 60007 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
663 60007 : lt = <s->tapes[tapenum];
664 60007 : Assert(!lt->writing);
665 :
666 180369 : while (size > 0)
667 : {
668 60355 : if (lt->pos >= lt->nbytes)
669 : {
670 : /* Try to load more data into buffer. */
671 355 : if (!ltsReadFillBuffer(lts, lt))
672 0 : break; /* EOF */
673 : }
674 :
675 60355 : nthistime = lt->nbytes - lt->pos;
676 60355 : if (nthistime > size)
677 59993 : nthistime = size;
678 60355 : Assert(nthistime > 0);
679 :
680 60355 : memcpy(ptr, lt->buffer + lt->pos, nthistime);
681 :
682 60355 : lt->pos += nthistime;
683 60355 : ptr = (void *) ((char *) ptr + nthistime);
684 60355 : size -= nthistime;
685 60355 : nread += nthistime;
686 : }
687 :
688 60007 : return nread;
689 : }
690 :
691 : /*
692 : * "Freeze" the contents of a tape so that it can be read multiple times
693 : * and/or read backwards. Once a tape is frozen, its contents will not
694 : * be released until the LogicalTapeSet is destroyed. This is expected
695 : * to be used only for the final output pass of a merge.
696 : *
697 : * This *must* be called just at the end of a write pass, before the
698 : * tape is rewound (after rewind is too late!). It performs a rewind
699 : * and switch to read mode "for free". An immediately following rewind-
700 : * for-read call is OK but not necessary.
701 : */
702 : void
703 1 : LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
704 : {
705 : LogicalTape *lt;
706 :
707 1 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
708 1 : lt = <s->tapes[tapenum];
709 1 : Assert(lt->writing);
710 :
711 : /*
712 : * Completion of a write phase. Flush last partial data block, and rewind
713 : * for nondestructive read.
714 : */
715 1 : if (lt->dirty)
716 : {
717 1 : TapeBlockSetNBytes(lt->buffer, lt->nbytes);
718 1 : ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
719 1 : lt->writing = false;
720 : }
721 1 : lt->writing = false;
722 1 : lt->frozen = true;
723 :
724 : /*
725 : * The seek and backspace functions assume a single block read buffer.
726 : * That's OK with current usage. A larger buffer is helpful to make the
727 : * read pattern of the backing file look more sequential to the OS, when
728 : * we're reading from multiple tapes. But at the end of a sort, when a
729 : * tape is frozen, we only read from a single tape anyway.
730 : */
731 1 : if (!lt->buffer || lt->buffer_size != BLCKSZ)
732 : {
733 0 : if (lt->buffer)
734 0 : pfree(lt->buffer);
735 0 : lt->buffer = palloc(BLCKSZ);
736 0 : lt->buffer_size = BLCKSZ;
737 : }
738 :
739 : /* Read the first block, or reset if tape is empty */
740 1 : lt->curBlockNumber = lt->firstBlockNumber;
741 1 : lt->pos = 0;
742 1 : lt->nbytes = 0;
743 :
744 1 : if (lt->firstBlockNumber == -1L)
745 0 : lt->nextBlockNumber = -1L;
746 1 : ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
747 1 : if (TapeBlockIsLast(lt->buffer))
748 0 : lt->nextBlockNumber = -1L;
749 : else
750 1 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
751 1 : lt->nbytes = TapeBlockGetNBytes(lt->buffer);
752 1 : }
753 :
754 : /*
755 : * Backspace the tape a given number of bytes. (We also support a more
756 : * general seek interface, see below.)
757 : *
758 : * *Only* a frozen-for-read tape can be backed up; we don't support
759 : * random access during write, and an unfrozen read tape may have
760 : * already discarded the desired data!
761 : *
762 : * Returns the number of bytes backed up. It can be less than the
763 : * requested amount, if there isn't that much data before the current
764 : * position. The tape is positioned to the beginning of the tape in
765 : * that case.
766 : */
767 : size_t
768 0 : LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
769 : {
770 : LogicalTape *lt;
771 0 : size_t seekpos = 0;
772 :
773 0 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
774 0 : lt = <s->tapes[tapenum];
775 0 : Assert(lt->frozen);
776 0 : Assert(lt->buffer_size == BLCKSZ);
777 :
778 : /*
779 : * Easy case for seek within current block.
780 : */
781 0 : if (size <= (size_t) lt->pos)
782 : {
783 0 : lt->pos -= (int) size;
784 0 : return size;
785 : }
786 :
787 : /*
788 : * Not-so-easy case, have to walk back the chain of blocks. This
789 : * implementation would be pretty inefficient for long seeks, but we
790 : * really aren't doing that (a seek over one tuple is typical).
791 : */
792 0 : seekpos = (size_t) lt->pos; /* part within this block */
793 0 : while (size > seekpos)
794 : {
795 0 : long prev = TapeBlockGetTrailer(lt->buffer)->prev;
796 :
797 0 : if (prev == -1L)
798 : {
799 : /* Tried to back up beyond the beginning of tape. */
800 0 : if (lt->curBlockNumber != lt->firstBlockNumber)
801 0 : elog(ERROR, "unexpected end of tape");
802 0 : lt->pos = 0;
803 0 : return seekpos;
804 : }
805 :
806 0 : ltsReadBlock(lts, prev, (void *) lt->buffer);
807 :
808 0 : if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
809 0 : elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
810 : prev,
811 : TapeBlockGetTrailer(lt->buffer)->next,
812 : lt->curBlockNumber);
813 :
814 0 : lt->nbytes = TapeBlockPayloadSize;
815 0 : lt->curBlockNumber = prev;
816 0 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
817 :
818 0 : seekpos += TapeBlockPayloadSize;
819 : }
820 :
821 : /*
822 : * 'seekpos' can now be greater than 'size', because it points to the
823 : * beginning the target block. The difference is the position within the
824 : * page.
825 : */
826 0 : lt->pos = seekpos - size;
827 0 : return size;
828 : }
829 :
830 : /*
831 : * Seek to an arbitrary position in a logical tape.
832 : *
833 : * *Only* a frozen-for-read tape can be seeked.
834 : *
835 : * Must be called with a block/offset previously returned by
836 : * LogicalTapeTell().
837 : */
838 : void
839 0 : LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
840 : long blocknum, int offset)
841 : {
842 : LogicalTape *lt;
843 :
844 0 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
845 0 : lt = <s->tapes[tapenum];
846 0 : Assert(lt->frozen);
847 0 : Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
848 0 : Assert(lt->buffer_size == BLCKSZ);
849 :
850 0 : if (blocknum != lt->curBlockNumber)
851 : {
852 0 : ltsReadBlock(lts, blocknum, (void *) lt->buffer);
853 0 : lt->curBlockNumber = blocknum;
854 0 : lt->nbytes = TapeBlockPayloadSize;
855 0 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
856 : }
857 :
858 0 : if (offset > lt->nbytes)
859 0 : elog(ERROR, "invalid tape seek position");
860 0 : lt->pos = offset;
861 0 : }
862 :
863 : /*
864 : * Obtain current position in a form suitable for a later LogicalTapeSeek.
865 : *
866 : * NOTE: it'd be OK to do this during write phase with intention of using
867 : * the position for a seek after freezing. Not clear if anyone needs that.
868 : */
869 : void
870 0 : LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
871 : long *blocknum, int *offset)
872 : {
873 : LogicalTape *lt;
874 :
875 0 : Assert(tapenum >= 0 && tapenum < lts->nTapes);
876 0 : lt = <s->tapes[tapenum];
877 :
878 : /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
879 0 : Assert(lt->buffer_size == BLCKSZ);
880 :
881 0 : *blocknum = lt->curBlockNumber;
882 0 : *offset = lt->pos;
883 0 : }
884 :
885 : /*
886 : * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
887 : */
888 : long
889 2 : LogicalTapeSetBlocks(LogicalTapeSet *lts)
890 : {
891 2 : return lts->nBlocksAllocated;
892 : }
|