Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * inv_api.c
4 : * routines for manipulating inversion fs large objects. This file
5 : * contains the user-level large object application interface routines.
6 : *
7 : *
8 : * Note: we access pg_largeobject.data using its C struct declaration.
9 : * This is safe because it immediately follows pageno which is an int4 field,
10 : * and therefore the data field will always be 4-byte aligned, even if it
11 : * is in the short 1-byte-header format. We have to detoast it since it's
12 : * quite likely to be in compressed or short format. We also need to check
13 : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : *
15 : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : * does most of the backend code. We expect that CurrentMemoryContext will
17 : * be a short-lived context. Data that must persist across function calls
18 : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : * memory context given to inv_open (for LargeObjectDesc structs).
20 : *
21 : *
22 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
23 : * Portions Copyright (c) 1994, Regents of the University of California
24 : *
25 : *
26 : * IDENTIFICATION
27 : * src/backend/storage/large_object/inv_api.c
28 : *
29 : *-------------------------------------------------------------------------
30 : */
31 : #include "postgres.h"
32 :
33 : #include <limits.h>
34 :
35 : #include "access/genam.h"
36 : #include "access/heapam.h"
37 : #include "access/sysattr.h"
38 : #include "access/tuptoaster.h"
39 : #include "access/xact.h"
40 : #include "catalog/dependency.h"
41 : #include "catalog/indexing.h"
42 : #include "catalog/objectaccess.h"
43 : #include "catalog/pg_largeobject.h"
44 : #include "catalog/pg_largeobject_metadata.h"
45 : #include "libpq/libpq-fs.h"
46 : #include "miscadmin.h"
47 : #include "storage/large_object.h"
48 : #include "utils/fmgroids.h"
49 : #include "utils/rel.h"
50 : #include "utils/snapmgr.h"
51 : #include "utils/tqual.h"
52 :
53 :
54 : /*
55 : * All accesses to pg_largeobject and its index make use of a single Relation
56 : * reference, so that we only need to open pg_relation once per transaction.
57 : * To avoid problems when the first such reference occurs inside a
58 : * subtransaction, we execute a slightly klugy maneuver to assign ownership of
59 : * the Relation reference to TopTransactionResourceOwner.
60 : */
61 : static Relation lo_heap_r = NULL;
62 : static Relation lo_index_r = NULL;
63 :
64 :
65 : /*
66 : * Open pg_largeobject and its index, if not already done in current xact
67 : */
68 : static void
69 470 : open_lo_relation(void)
70 : {
71 : ResourceOwner currentOwner;
72 :
73 470 : if (lo_heap_r && lo_index_r)
74 906 : return; /* already open in current xact */
75 :
76 : /* Arrange for the top xact to own these relation references */
77 34 : currentOwner = CurrentResourceOwner;
78 34 : PG_TRY();
79 : {
80 34 : CurrentResourceOwner = TopTransactionResourceOwner;
81 :
82 : /* Use RowExclusiveLock since we might either read or write */
83 34 : if (lo_heap_r == NULL)
84 34 : lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
85 34 : if (lo_index_r == NULL)
86 34 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
87 : }
88 0 : PG_CATCH();
89 : {
90 : /* Ensure CurrentResourceOwner is restored on error */
91 0 : CurrentResourceOwner = currentOwner;
92 0 : PG_RE_THROW();
93 : }
94 34 : PG_END_TRY();
95 34 : CurrentResourceOwner = currentOwner;
96 : }
97 :
98 : /*
99 : * Clean up at main transaction end
100 : */
101 : void
102 54 : close_lo_relation(bool isCommit)
103 : {
104 54 : if (lo_heap_r || lo_index_r)
105 : {
106 : /*
107 : * Only bother to close if committing; else abort cleanup will handle
108 : * it
109 : */
110 34 : if (isCommit)
111 : {
112 : ResourceOwner currentOwner;
113 :
114 33 : currentOwner = CurrentResourceOwner;
115 33 : PG_TRY();
116 : {
117 33 : CurrentResourceOwner = TopTransactionResourceOwner;
118 :
119 33 : if (lo_index_r)
120 33 : index_close(lo_index_r, NoLock);
121 33 : if (lo_heap_r)
122 33 : heap_close(lo_heap_r, NoLock);
123 : }
124 0 : PG_CATCH();
125 : {
126 : /* Ensure CurrentResourceOwner is restored on error */
127 0 : CurrentResourceOwner = currentOwner;
128 0 : PG_RE_THROW();
129 : }
130 33 : PG_END_TRY();
131 33 : CurrentResourceOwner = currentOwner;
132 : }
133 34 : lo_heap_r = NULL;
134 34 : lo_index_r = NULL;
135 : }
136 54 : }
137 :
138 :
139 : /*
140 : * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
141 : * read with can be specified.
142 : */
143 : static bool
144 48 : myLargeObjectExists(Oid loid, Snapshot snapshot)
145 : {
146 : Relation pg_lo_meta;
147 : ScanKeyData skey[1];
148 : SysScanDesc sd;
149 : HeapTuple tuple;
150 48 : bool retval = false;
151 :
152 48 : ScanKeyInit(&skey[0],
153 : ObjectIdAttributeNumber,
154 : BTEqualStrategyNumber, F_OIDEQ,
155 : ObjectIdGetDatum(loid));
156 :
157 48 : pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
158 : AccessShareLock);
159 :
160 48 : sd = systable_beginscan(pg_lo_meta,
161 : LargeObjectMetadataOidIndexId, true,
162 : snapshot, 1, skey);
163 :
164 48 : tuple = systable_getnext(sd);
165 48 : if (HeapTupleIsValid(tuple))
166 48 : retval = true;
167 :
168 48 : systable_endscan(sd);
169 :
170 48 : heap_close(pg_lo_meta, AccessShareLock);
171 :
172 48 : return retval;
173 : }
174 :
175 :
176 : /*
177 : * Extract data field from a pg_largeobject tuple, detoasting if needed
178 : * and verifying that the length is sane. Returns data pointer (a bytea *),
179 : * data length, and an indication of whether to pfree the data pointer.
180 : */
181 : static void
182 1680 : getdatafield(Form_pg_largeobject tuple,
183 : bytea **pdatafield,
184 : int *plen,
185 : bool *pfreeit)
186 : {
187 : bytea *datafield;
188 : int len;
189 : bool freeit;
190 :
191 1680 : datafield = &(tuple->data); /* see note at top of file */
192 1680 : freeit = false;
193 1680 : if (VARATT_IS_EXTENDED(datafield))
194 : {
195 1655 : datafield = (bytea *)
196 : heap_tuple_untoast_attr((struct varlena *) datafield);
197 1655 : freeit = true;
198 : }
199 1680 : len = VARSIZE(datafield) - VARHDRSZ;
200 1680 : if (len < 0 || len > LOBLKSIZE)
201 0 : ereport(ERROR,
202 : (errcode(ERRCODE_DATA_CORRUPTED),
203 : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
204 : tuple->loid, tuple->pageno, len)));
205 1680 : *pdatafield = datafield;
206 1680 : *plen = len;
207 1680 : *pfreeit = freeit;
208 1680 : }
209 :
210 :
211 : /*
212 : * inv_create -- create a new large object
213 : *
214 : * Arguments:
215 : * lobjId - OID to use for new large object, or InvalidOid to pick one
216 : *
217 : * Returns:
218 : * OID of new object
219 : *
220 : * If lobjId is not InvalidOid, then an error occurs if the OID is already
221 : * in use.
222 : */
223 : Oid
224 15 : inv_create(Oid lobjId)
225 : {
226 : Oid lobjId_new;
227 :
228 : /*
229 : * Create a new largeobject with empty data pages
230 : */
231 15 : lobjId_new = LargeObjectCreate(lobjId);
232 :
233 : /*
234 : * dependency on the owner of largeobject
235 : *
236 : * The reason why we use LargeObjectRelationId instead of
237 : * LargeObjectMetadataRelationId here is to provide backward compatibility
238 : * to the applications which utilize a knowledge about internal layout of
239 : * system catalogs. OID of pg_largeobject_metadata and loid of
240 : * pg_largeobject are same value, so there are no actual differences here.
241 : */
242 15 : recordDependencyOnOwner(LargeObjectRelationId,
243 : lobjId_new, GetUserId());
244 :
245 : /* Post creation hook for new large object */
246 15 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
247 :
248 : /*
249 : * Advance command counter to make new tuple visible to later operations.
250 : */
251 15 : CommandCounterIncrement();
252 :
253 15 : return lobjId_new;
254 : }
255 :
256 : /*
257 : * inv_open -- access an existing large object.
258 : *
259 : * Returns:
260 : * Large object descriptor, appropriately filled in. The descriptor
261 : * and subsidiary data are allocated in the specified memory context,
262 : * which must be suitably long-lived for the caller's purposes.
263 : */
264 : LargeObjectDesc *
265 48 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
266 : {
267 : LargeObjectDesc *retval;
268 48 : Snapshot snapshot = NULL;
269 48 : int descflags = 0;
270 :
271 48 : if (flags & INV_WRITE)
272 : {
273 25 : snapshot = NULL; /* instantaneous MVCC snapshot */
274 25 : descflags = IFS_WRLOCK | IFS_RDLOCK;
275 : }
276 23 : else if (flags & INV_READ)
277 : {
278 23 : snapshot = GetActiveSnapshot();
279 23 : descflags = IFS_RDLOCK;
280 : }
281 : else
282 0 : ereport(ERROR,
283 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
284 : errmsg("invalid flags for opening a large object: %d",
285 : flags)));
286 :
287 : /* Can't use LargeObjectExists here because we need to specify snapshot */
288 48 : if (!myLargeObjectExists(lobjId, snapshot))
289 0 : ereport(ERROR,
290 : (errcode(ERRCODE_UNDEFINED_OBJECT),
291 : errmsg("large object %u does not exist", lobjId)));
292 :
293 : /*
294 : * We must register the snapshot in TopTransaction's resowner, because it
295 : * must stay alive until the LO is closed rather than until the current
296 : * portal shuts down. Do this after checking that the LO exists, to avoid
297 : * leaking the snapshot if an error is thrown.
298 : */
299 48 : if (snapshot)
300 23 : snapshot = RegisterSnapshotOnOwner(snapshot,
301 : TopTransactionResourceOwner);
302 :
303 : /* All set, create a descriptor */
304 48 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
305 : sizeof(LargeObjectDesc));
306 48 : retval->id = lobjId;
307 48 : retval->subid = GetCurrentSubTransactionId();
308 48 : retval->offset = 0;
309 48 : retval->snapshot = snapshot;
310 48 : retval->flags = descflags;
311 :
312 48 : return retval;
313 : }
314 :
315 : /*
316 : * Closes a large object descriptor previously made by inv_open(), and
317 : * releases the long-term memory used by it.
318 : */
319 : void
320 36 : inv_close(LargeObjectDesc *obj_desc)
321 : {
322 36 : Assert(PointerIsValid(obj_desc));
323 :
324 36 : UnregisterSnapshotFromOwner(obj_desc->snapshot,
325 : TopTransactionResourceOwner);
326 :
327 36 : pfree(obj_desc);
328 36 : }
329 :
330 : /*
331 : * Destroys an existing large object (not to be confused with a descriptor!)
332 : *
333 : * returns -1 if failed
334 : */
335 : int
336 12 : inv_drop(Oid lobjId)
337 : {
338 : ObjectAddress object;
339 :
340 : /*
341 : * Delete any comments and dependencies on the large object
342 : */
343 12 : object.classId = LargeObjectRelationId;
344 12 : object.objectId = lobjId;
345 12 : object.objectSubId = 0;
346 12 : performDeletion(&object, DROP_CASCADE, 0);
347 :
348 : /*
349 : * Advance command counter so that tuple removal will be seen by later
350 : * large-object operations in this transaction.
351 : */
352 12 : CommandCounterIncrement();
353 :
354 12 : return 1;
355 : }
356 :
357 : /*
358 : * Determine size of a large object
359 : *
360 : * NOTE: LOs can contain gaps, just like Unix files. We actually return
361 : * the offset of the last byte + 1.
362 : */
363 : static uint64
364 16 : inv_getsize(LargeObjectDesc *obj_desc)
365 : {
366 16 : uint64 lastbyte = 0;
367 : ScanKeyData skey[1];
368 : SysScanDesc sd;
369 : HeapTuple tuple;
370 :
371 16 : Assert(PointerIsValid(obj_desc));
372 :
373 16 : open_lo_relation();
374 :
375 16 : ScanKeyInit(&skey[0],
376 : Anum_pg_largeobject_loid,
377 : BTEqualStrategyNumber, F_OIDEQ,
378 : ObjectIdGetDatum(obj_desc->id));
379 :
380 16 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
381 : obj_desc->snapshot, 1, skey);
382 :
383 : /*
384 : * Because the pg_largeobject index is on both loid and pageno, but we
385 : * constrain only loid, a backwards scan should visit all pages of the
386 : * large object in reverse pageno order. So, it's sufficient to examine
387 : * the first valid tuple (== last valid page).
388 : */
389 16 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
390 16 : if (HeapTupleIsValid(tuple))
391 : {
392 : Form_pg_largeobject data;
393 : bytea *datafield;
394 : int len;
395 : bool pfreeit;
396 :
397 16 : if (HeapTupleHasNulls(tuple)) /* paranoia */
398 0 : elog(ERROR, "null field found in pg_largeobject");
399 16 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
400 16 : getdatafield(data, &datafield, &len, &pfreeit);
401 16 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
402 16 : if (pfreeit)
403 3 : pfree(datafield);
404 : }
405 :
406 16 : systable_endscan_ordered(sd);
407 :
408 16 : return lastbyte;
409 : }
410 :
411 : int64
412 34 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
413 : {
414 : int64 newoffset;
415 :
416 34 : Assert(PointerIsValid(obj_desc));
417 :
418 : /*
419 : * Note: overflow in the additions is possible, but since we will reject
420 : * negative results, we don't need any extra test for that.
421 : */
422 34 : switch (whence)
423 : {
424 : case SEEK_SET:
425 15 : newoffset = offset;
426 15 : break;
427 : case SEEK_CUR:
428 3 : newoffset = obj_desc->offset + offset;
429 3 : break;
430 : case SEEK_END:
431 16 : newoffset = inv_getsize(obj_desc) + offset;
432 16 : break;
433 : default:
434 0 : ereport(ERROR,
435 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
436 : errmsg("invalid whence setting: %d", whence)));
437 : newoffset = 0; /* keep compiler quiet */
438 : break;
439 : }
440 :
441 : /*
442 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
443 : * in translatable strings; doing better is not worth the trouble
444 : */
445 34 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
446 0 : ereport(ERROR,
447 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
448 : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
449 : newoffset)));
450 :
451 34 : obj_desc->offset = newoffset;
452 34 : return newoffset;
453 : }
454 :
455 : int64
456 8 : inv_tell(LargeObjectDesc *obj_desc)
457 : {
458 8 : Assert(PointerIsValid(obj_desc));
459 :
460 8 : return obj_desc->offset;
461 : }
462 :
463 : int
464 189 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
465 : {
466 189 : int nread = 0;
467 : int64 n;
468 : int64 off;
469 : int len;
470 189 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
471 : uint64 pageoff;
472 : ScanKeyData skey[2];
473 : SysScanDesc sd;
474 : HeapTuple tuple;
475 :
476 189 : Assert(PointerIsValid(obj_desc));
477 189 : Assert(buf != NULL);
478 :
479 189 : if (nbytes <= 0)
480 0 : return 0;
481 :
482 189 : open_lo_relation();
483 :
484 189 : ScanKeyInit(&skey[0],
485 : Anum_pg_largeobject_loid,
486 : BTEqualStrategyNumber, F_OIDEQ,
487 : ObjectIdGetDatum(obj_desc->id));
488 :
489 189 : ScanKeyInit(&skey[1],
490 : Anum_pg_largeobject_pageno,
491 : BTGreaterEqualStrategyNumber, F_INT4GE,
492 : Int32GetDatum(pageno));
493 :
494 189 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
495 : obj_desc->snapshot, 2, skey);
496 :
497 1860 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
498 : {
499 : Form_pg_largeobject data;
500 : bytea *datafield;
501 : bool pfreeit;
502 :
503 1659 : if (HeapTupleHasNulls(tuple)) /* paranoia */
504 0 : elog(ERROR, "null field found in pg_largeobject");
505 1659 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
506 :
507 : /*
508 : * We expect the indexscan will deliver pages in order. However,
509 : * there may be missing pages if the LO contains unwritten "holes". We
510 : * want missing sections to read out as zeroes.
511 : */
512 1659 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
513 1659 : if (pageoff > obj_desc->offset)
514 : {
515 2 : n = pageoff - obj_desc->offset;
516 2 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
517 2 : MemSet(buf + nread, 0, n);
518 2 : nread += n;
519 2 : obj_desc->offset += n;
520 : }
521 :
522 1659 : if (nread < nbytes)
523 : {
524 1658 : Assert(obj_desc->offset >= pageoff);
525 1658 : off = (int) (obj_desc->offset - pageoff);
526 1658 : Assert(off >= 0 && off < LOBLKSIZE);
527 :
528 1658 : getdatafield(data, &datafield, &len, &pfreeit);
529 1658 : if (len > off)
530 : {
531 1655 : n = len - off;
532 1655 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
533 1655 : memcpy(buf + nread, VARDATA(datafield) + off, n);
534 1655 : nread += n;
535 1655 : obj_desc->offset += n;
536 : }
537 1658 : if (pfreeit)
538 1648 : pfree(datafield);
539 : }
540 :
541 1659 : if (nread >= nbytes)
542 177 : break;
543 : }
544 :
545 189 : systable_endscan_ordered(sd);
546 :
547 189 : return nread;
548 : }
549 :
550 : int
551 258 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
552 : {
553 258 : int nwritten = 0;
554 : int n;
555 : int off;
556 : int len;
557 258 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
558 : ScanKeyData skey[2];
559 : SysScanDesc sd;
560 : HeapTuple oldtuple;
561 : Form_pg_largeobject olddata;
562 : bool neednextpage;
563 : bytea *datafield;
564 : bool pfreeit;
565 : union
566 : {
567 : bytea hdr;
568 : /* this is to make the union big enough for a LO data chunk: */
569 : char data[LOBLKSIZE + VARHDRSZ];
570 : /* ensure union is aligned well enough: */
571 : int32 align_it;
572 : } workbuf;
573 258 : char *workb = VARDATA(&workbuf.hdr);
574 : HeapTuple newtup;
575 : Datum values[Natts_pg_largeobject];
576 : bool nulls[Natts_pg_largeobject];
577 : bool replace[Natts_pg_largeobject];
578 : CatalogIndexState indstate;
579 :
580 258 : Assert(PointerIsValid(obj_desc));
581 258 : Assert(buf != NULL);
582 :
583 : /* enforce writability because snapshot is probably wrong otherwise */
584 258 : Assert(obj_desc->flags & IFS_WRLOCK);
585 :
586 258 : if (nbytes <= 0)
587 0 : return 0;
588 :
589 : /* this addition can't overflow because nbytes is only int32 */
590 258 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
591 0 : ereport(ERROR,
592 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
593 : errmsg("invalid large object write request size: %d",
594 : nbytes)));
595 :
596 258 : open_lo_relation();
597 :
598 258 : indstate = CatalogOpenIndexes(lo_heap_r);
599 :
600 258 : ScanKeyInit(&skey[0],
601 : Anum_pg_largeobject_loid,
602 : BTEqualStrategyNumber, F_OIDEQ,
603 : ObjectIdGetDatum(obj_desc->id));
604 :
605 258 : ScanKeyInit(&skey[1],
606 : Anum_pg_largeobject_pageno,
607 : BTGreaterEqualStrategyNumber, F_INT4GE,
608 : Int32GetDatum(pageno));
609 :
610 258 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
611 : obj_desc->snapshot, 2, skey);
612 :
613 258 : oldtuple = NULL;
614 258 : olddata = NULL;
615 258 : neednextpage = true;
616 :
617 1840 : while (nwritten < nbytes)
618 : {
619 : /*
620 : * If possible, get next pre-existing page of the LO. We expect the
621 : * indexscan will deliver these in order --- but there may be holes.
622 : */
623 1324 : if (neednextpage)
624 : {
625 259 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
626 : {
627 4 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
628 0 : elog(ERROR, "null field found in pg_largeobject");
629 4 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
630 4 : Assert(olddata->pageno >= pageno);
631 : }
632 259 : neednextpage = false;
633 : }
634 :
635 : /*
636 : * If we have a pre-existing page, see if it is the page we want to
637 : * write, or a later one.
638 : */
639 1324 : if (olddata != NULL && olddata->pageno == pageno)
640 : {
641 : /*
642 : * Update an existing page with fresh data.
643 : *
644 : * First, load old data into workbuf
645 : */
646 4 : getdatafield(olddata, &datafield, &len, &pfreeit);
647 4 : memcpy(workb, VARDATA(datafield), len);
648 4 : if (pfreeit)
649 3 : pfree(datafield);
650 :
651 : /*
652 : * Fill any hole
653 : */
654 4 : off = (int) (obj_desc->offset % LOBLKSIZE);
655 4 : if (off > len)
656 0 : MemSet(workb + len, 0, off - len);
657 :
658 : /*
659 : * Insert appropriate portion of new data
660 : */
661 4 : n = LOBLKSIZE - off;
662 4 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
663 4 : memcpy(workb + off, buf + nwritten, n);
664 4 : nwritten += n;
665 4 : obj_desc->offset += n;
666 4 : off += n;
667 : /* compute valid length of new page */
668 4 : len = (len >= off) ? len : off;
669 4 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
670 :
671 : /*
672 : * Form and insert updated tuple
673 : */
674 4 : memset(values, 0, sizeof(values));
675 4 : memset(nulls, false, sizeof(nulls));
676 4 : memset(replace, false, sizeof(replace));
677 4 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
678 4 : replace[Anum_pg_largeobject_data - 1] = true;
679 4 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
680 : values, nulls, replace);
681 4 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
682 : indstate);
683 4 : heap_freetuple(newtup);
684 :
685 : /*
686 : * We're done with this old page.
687 : */
688 4 : oldtuple = NULL;
689 4 : olddata = NULL;
690 4 : neednextpage = true;
691 : }
692 : else
693 : {
694 : /*
695 : * Write a brand new page.
696 : *
697 : * First, fill any hole
698 : */
699 1320 : off = (int) (obj_desc->offset % LOBLKSIZE);
700 1320 : if (off > 0)
701 1 : MemSet(workb, 0, off);
702 :
703 : /*
704 : * Insert appropriate portion of new data
705 : */
706 1320 : n = LOBLKSIZE - off;
707 1320 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
708 1320 : memcpy(workb + off, buf + nwritten, n);
709 1320 : nwritten += n;
710 1320 : obj_desc->offset += n;
711 : /* compute valid length of new page */
712 1320 : len = off + n;
713 1320 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
714 :
715 : /*
716 : * Form and insert updated tuple
717 : */
718 1320 : memset(values, 0, sizeof(values));
719 1320 : memset(nulls, false, sizeof(nulls));
720 1320 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
721 1320 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
722 1320 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
723 1320 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
724 1320 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
725 1320 : heap_freetuple(newtup);
726 : }
727 1324 : pageno++;
728 : }
729 :
730 258 : systable_endscan_ordered(sd);
731 :
732 258 : CatalogCloseIndexes(indstate);
733 :
734 : /*
735 : * Advance command counter so that my tuple updates will be seen by later
736 : * large-object operations in this transaction.
737 : */
738 258 : CommandCounterIncrement();
739 :
740 258 : return nwritten;
741 : }
742 :
743 : void
744 7 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
745 : {
746 7 : int32 pageno = (int32) (len / LOBLKSIZE);
747 : int32 off;
748 : ScanKeyData skey[2];
749 : SysScanDesc sd;
750 : HeapTuple oldtuple;
751 : Form_pg_largeobject olddata;
752 : union
753 : {
754 : bytea hdr;
755 : /* this is to make the union big enough for a LO data chunk: */
756 : char data[LOBLKSIZE + VARHDRSZ];
757 : /* ensure union is aligned well enough: */
758 : int32 align_it;
759 : } workbuf;
760 7 : char *workb = VARDATA(&workbuf.hdr);
761 : HeapTuple newtup;
762 : Datum values[Natts_pg_largeobject];
763 : bool nulls[Natts_pg_largeobject];
764 : bool replace[Natts_pg_largeobject];
765 : CatalogIndexState indstate;
766 :
767 7 : Assert(PointerIsValid(obj_desc));
768 :
769 : /* enforce writability because snapshot is probably wrong otherwise */
770 7 : Assert(obj_desc->flags & IFS_WRLOCK);
771 :
772 : /*
773 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
774 : * in translatable strings; doing better is not worth the trouble
775 : */
776 7 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
777 0 : ereport(ERROR,
778 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
779 : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
780 : len)));
781 :
782 7 : open_lo_relation();
783 :
784 7 : indstate = CatalogOpenIndexes(lo_heap_r);
785 :
786 : /*
787 : * Set up to find all pages with desired loid and pageno >= target
788 : */
789 7 : ScanKeyInit(&skey[0],
790 : Anum_pg_largeobject_loid,
791 : BTEqualStrategyNumber, F_OIDEQ,
792 : ObjectIdGetDatum(obj_desc->id));
793 :
794 7 : ScanKeyInit(&skey[1],
795 : Anum_pg_largeobject_pageno,
796 : BTGreaterEqualStrategyNumber, F_INT4GE,
797 : Int32GetDatum(pageno));
798 :
799 7 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
800 : obj_desc->snapshot, 2, skey);
801 :
802 : /*
803 : * If possible, get the page the truncation point is in. The truncation
804 : * point may be beyond the end of the LO or in a hole.
805 : */
806 7 : olddata = NULL;
807 7 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
808 : {
809 4 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
810 0 : elog(ERROR, "null field found in pg_largeobject");
811 4 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
812 4 : Assert(olddata->pageno >= pageno);
813 : }
814 :
815 : /*
816 : * If we found the page of the truncation point we need to truncate the
817 : * data in it. Otherwise if we're in a hole, we need to create a page to
818 : * mark the end of data.
819 : */
820 7 : if (olddata != NULL && olddata->pageno == pageno)
821 2 : {
822 : /* First, load old data into workbuf */
823 : bytea *datafield;
824 : int pagelen;
825 : bool pfreeit;
826 :
827 2 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
828 2 : memcpy(workb, VARDATA(datafield), pagelen);
829 2 : if (pfreeit)
830 1 : pfree(datafield);
831 :
832 : /*
833 : * Fill any hole
834 : */
835 2 : off = len % LOBLKSIZE;
836 2 : if (off > pagelen)
837 1 : MemSet(workb + pagelen, 0, off - pagelen);
838 :
839 : /* compute length of new page */
840 2 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
841 :
842 : /*
843 : * Form and insert updated tuple
844 : */
845 2 : memset(values, 0, sizeof(values));
846 2 : memset(nulls, false, sizeof(nulls));
847 2 : memset(replace, false, sizeof(replace));
848 2 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
849 2 : replace[Anum_pg_largeobject_data - 1] = true;
850 2 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
851 : values, nulls, replace);
852 2 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
853 : indstate);
854 2 : heap_freetuple(newtup);
855 : }
856 : else
857 : {
858 : /*
859 : * If the first page we found was after the truncation point, we're in
860 : * a hole that we'll fill, but we need to delete the later page
861 : * because the loop below won't visit it again.
862 : */
863 5 : if (olddata != NULL)
864 : {
865 2 : Assert(olddata->pageno > pageno);
866 2 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
867 : }
868 :
869 : /*
870 : * Write a brand new page.
871 : *
872 : * Fill the hole up to the truncation point
873 : */
874 5 : off = len % LOBLKSIZE;
875 5 : if (off > 0)
876 5 : MemSet(workb, 0, off);
877 :
878 : /* compute length of new page */
879 5 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880 :
881 : /*
882 : * Form and insert new tuple
883 : */
884 5 : memset(values, 0, sizeof(values));
885 5 : memset(nulls, false, sizeof(nulls));
886 5 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
887 5 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
888 5 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 5 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
890 5 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
891 5 : heap_freetuple(newtup);
892 : }
893 :
894 : /*
895 : * Delete any pages after the truncation point. If the initial search
896 : * didn't find a page, then of course there's nothing more to do.
897 : */
898 7 : if (olddata != NULL)
899 : {
900 9 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
901 : {
902 1 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
903 : }
904 : }
905 :
906 7 : systable_endscan_ordered(sd);
907 :
908 7 : CatalogCloseIndexes(indstate);
909 :
910 : /*
911 : * Advance command counter so that tuple updates will be seen by later
912 : * large-object operations in this transaction.
913 : */
914 7 : CommandCounterIncrement();
915 7 : }
|