LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 284 304 93.4 %
Date: 2017-09-29 13:40:31 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/genam.h"
      36             : #include "access/heapam.h"
      37             : #include "access/sysattr.h"
      38             : #include "access/tuptoaster.h"
      39             : #include "access/xact.h"
      40             : #include "catalog/dependency.h"
      41             : #include "catalog/indexing.h"
      42             : #include "catalog/objectaccess.h"
      43             : #include "catalog/pg_largeobject.h"
      44             : #include "catalog/pg_largeobject_metadata.h"
      45             : #include "libpq/libpq-fs.h"
      46             : #include "miscadmin.h"
      47             : #include "storage/large_object.h"
      48             : #include "utils/fmgroids.h"
      49             : #include "utils/rel.h"
      50             : #include "utils/snapmgr.h"
      51             : #include "utils/tqual.h"
      52             : 
      53             : 
      54             : /*
      55             :  * All accesses to pg_largeobject and its index make use of a single Relation
      56             :  * reference, so that we only need to open pg_relation once per transaction.
      57             :  * To avoid problems when the first such reference occurs inside a
      58             :  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
      59             :  * the Relation reference to TopTransactionResourceOwner.
      60             :  */
      61             : static Relation lo_heap_r = NULL;
      62             : static Relation lo_index_r = NULL;
      63             : 
      64             : 
      65             : /*
      66             :  * Open pg_largeobject and its index, if not already done in current xact
      67             :  */
      68             : static void
      69         470 : open_lo_relation(void)
      70             : {
      71             :     ResourceOwner currentOwner;
      72             : 
      73         470 :     if (lo_heap_r && lo_index_r)
      74         906 :         return;                 /* already open in current xact */
      75             : 
      76             :     /* Arrange for the top xact to own these relation references */
      77          34 :     currentOwner = CurrentResourceOwner;
      78          34 :     PG_TRY();
      79             :     {
      80          34 :         CurrentResourceOwner = TopTransactionResourceOwner;
      81             : 
      82             :         /* Use RowExclusiveLock since we might either read or write */
      83          34 :         if (lo_heap_r == NULL)
      84          34 :             lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
      85          34 :         if (lo_index_r == NULL)
      86          34 :             lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      87             :     }
      88           0 :     PG_CATCH();
      89             :     {
      90             :         /* Ensure CurrentResourceOwner is restored on error */
      91           0 :         CurrentResourceOwner = currentOwner;
      92           0 :         PG_RE_THROW();
      93             :     }
      94          34 :     PG_END_TRY();
      95          34 :     CurrentResourceOwner = currentOwner;
      96             : }
      97             : 
      98             : /*
      99             :  * Clean up at main transaction end
     100             :  */
     101             : void
     102          54 : close_lo_relation(bool isCommit)
     103             : {
     104          54 :     if (lo_heap_r || lo_index_r)
     105             :     {
     106             :         /*
     107             :          * Only bother to close if committing; else abort cleanup will handle
     108             :          * it
     109             :          */
     110          34 :         if (isCommit)
     111             :         {
     112             :             ResourceOwner currentOwner;
     113             : 
     114          33 :             currentOwner = CurrentResourceOwner;
     115          33 :             PG_TRY();
     116             :             {
     117          33 :                 CurrentResourceOwner = TopTransactionResourceOwner;
     118             : 
     119          33 :                 if (lo_index_r)
     120          33 :                     index_close(lo_index_r, NoLock);
     121          33 :                 if (lo_heap_r)
     122          33 :                     heap_close(lo_heap_r, NoLock);
     123             :             }
     124           0 :             PG_CATCH();
     125             :             {
     126             :                 /* Ensure CurrentResourceOwner is restored on error */
     127           0 :                 CurrentResourceOwner = currentOwner;
     128           0 :                 PG_RE_THROW();
     129             :             }
     130          33 :             PG_END_TRY();
     131          33 :             CurrentResourceOwner = currentOwner;
     132             :         }
     133          34 :         lo_heap_r = NULL;
     134          34 :         lo_index_r = NULL;
     135             :     }
     136          54 : }
     137             : 
     138             : 
     139             : /*
     140             :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     141             :  * read with can be specified.
     142             :  */
     143             : static bool
     144          48 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     145             : {
     146             :     Relation    pg_lo_meta;
     147             :     ScanKeyData skey[1];
     148             :     SysScanDesc sd;
     149             :     HeapTuple   tuple;
     150          48 :     bool        retval = false;
     151             : 
     152          48 :     ScanKeyInit(&skey[0],
     153             :                 ObjectIdAttributeNumber,
     154             :                 BTEqualStrategyNumber, F_OIDEQ,
     155             :                 ObjectIdGetDatum(loid));
     156             : 
     157          48 :     pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
     158             :                            AccessShareLock);
     159             : 
     160          48 :     sd = systable_beginscan(pg_lo_meta,
     161             :                             LargeObjectMetadataOidIndexId, true,
     162             :                             snapshot, 1, skey);
     163             : 
     164          48 :     tuple = systable_getnext(sd);
     165          48 :     if (HeapTupleIsValid(tuple))
     166          48 :         retval = true;
     167             : 
     168          48 :     systable_endscan(sd);
     169             : 
     170          48 :     heap_close(pg_lo_meta, AccessShareLock);
     171             : 
     172          48 :     return retval;
     173             : }
     174             : 
     175             : 
     176             : /*
     177             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     178             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     179             :  * data length, and an indication of whether to pfree the data pointer.
     180             :  */
     181             : static void
     182        1680 : getdatafield(Form_pg_largeobject tuple,
     183             :              bytea **pdatafield,
     184             :              int *plen,
     185             :              bool *pfreeit)
     186             : {
     187             :     bytea      *datafield;
     188             :     int         len;
     189             :     bool        freeit;
     190             : 
     191        1680 :     datafield = &(tuple->data); /* see note at top of file */
     192        1680 :     freeit = false;
     193        1680 :     if (VARATT_IS_EXTENDED(datafield))
     194             :     {
     195        1655 :         datafield = (bytea *)
     196             :             heap_tuple_untoast_attr((struct varlena *) datafield);
     197        1655 :         freeit = true;
     198             :     }
     199        1680 :     len = VARSIZE(datafield) - VARHDRSZ;
     200        1680 :     if (len < 0 || len > LOBLKSIZE)
     201           0 :         ereport(ERROR,
     202             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     203             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     204             :                         tuple->loid, tuple->pageno, len)));
     205        1680 :     *pdatafield = datafield;
     206        1680 :     *plen = len;
     207        1680 :     *pfreeit = freeit;
     208        1680 : }
     209             : 
     210             : 
     211             : /*
     212             :  *  inv_create -- create a new large object
     213             :  *
     214             :  *  Arguments:
     215             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     216             :  *
     217             :  *  Returns:
     218             :  *    OID of new object
     219             :  *
     220             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     221             :  * in use.
     222             :  */
     223             : Oid
     224          15 : inv_create(Oid lobjId)
     225             : {
     226             :     Oid         lobjId_new;
     227             : 
     228             :     /*
     229             :      * Create a new largeobject with empty data pages
     230             :      */
     231          15 :     lobjId_new = LargeObjectCreate(lobjId);
     232             : 
     233             :     /*
     234             :      * dependency on the owner of largeobject
     235             :      *
     236             :      * The reason why we use LargeObjectRelationId instead of
     237             :      * LargeObjectMetadataRelationId here is to provide backward compatibility
     238             :      * to the applications which utilize a knowledge about internal layout of
     239             :      * system catalogs. OID of pg_largeobject_metadata and loid of
     240             :      * pg_largeobject are same value, so there are no actual differences here.
     241             :      */
     242          15 :     recordDependencyOnOwner(LargeObjectRelationId,
     243             :                             lobjId_new, GetUserId());
     244             : 
     245             :     /* Post creation hook for new large object */
     246          15 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     247             : 
     248             :     /*
     249             :      * Advance command counter to make new tuple visible to later operations.
     250             :      */
     251          15 :     CommandCounterIncrement();
     252             : 
     253          15 :     return lobjId_new;
     254             : }
     255             : 
     256             : /*
     257             :  *  inv_open -- access an existing large object.
     258             :  *
     259             :  *      Returns:
     260             :  *        Large object descriptor, appropriately filled in.  The descriptor
     261             :  *        and subsidiary data are allocated in the specified memory context,
     262             :  *        which must be suitably long-lived for the caller's purposes.
     263             :  */
     264             : LargeObjectDesc *
     265          48 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     266             : {
     267             :     LargeObjectDesc *retval;
     268          48 :     Snapshot    snapshot = NULL;
     269          48 :     int         descflags = 0;
     270             : 
     271          48 :     if (flags & INV_WRITE)
     272             :     {
     273          25 :         snapshot = NULL;        /* instantaneous MVCC snapshot */
     274          25 :         descflags = IFS_WRLOCK | IFS_RDLOCK;
     275             :     }
     276          23 :     else if (flags & INV_READ)
     277             :     {
     278          23 :         snapshot = GetActiveSnapshot();
     279          23 :         descflags = IFS_RDLOCK;
     280             :     }
     281             :     else
     282           0 :         ereport(ERROR,
     283             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     284             :                  errmsg("invalid flags for opening a large object: %d",
     285             :                         flags)));
     286             : 
     287             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     288          48 :     if (!myLargeObjectExists(lobjId, snapshot))
     289           0 :         ereport(ERROR,
     290             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     291             :                  errmsg("large object %u does not exist", lobjId)));
     292             : 
     293             :     /*
     294             :      * We must register the snapshot in TopTransaction's resowner, because it
     295             :      * must stay alive until the LO is closed rather than until the current
     296             :      * portal shuts down. Do this after checking that the LO exists, to avoid
     297             :      * leaking the snapshot if an error is thrown.
     298             :      */
     299          48 :     if (snapshot)
     300          23 :         snapshot = RegisterSnapshotOnOwner(snapshot,
     301             :                                            TopTransactionResourceOwner);
     302             : 
     303             :     /* All set, create a descriptor */
     304          48 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     305             :                                                     sizeof(LargeObjectDesc));
     306          48 :     retval->id = lobjId;
     307          48 :     retval->subid = GetCurrentSubTransactionId();
     308          48 :     retval->offset = 0;
     309          48 :     retval->snapshot = snapshot;
     310          48 :     retval->flags = descflags;
     311             : 
     312          48 :     return retval;
     313             : }
     314             : 
     315             : /*
     316             :  * Closes a large object descriptor previously made by inv_open(), and
     317             :  * releases the long-term memory used by it.
     318             :  */
     319             : void
     320          36 : inv_close(LargeObjectDesc *obj_desc)
     321             : {
     322          36 :     Assert(PointerIsValid(obj_desc));
     323             : 
     324          36 :     UnregisterSnapshotFromOwner(obj_desc->snapshot,
     325             :                                 TopTransactionResourceOwner);
     326             : 
     327          36 :     pfree(obj_desc);
     328          36 : }
     329             : 
     330             : /*
     331             :  * Destroys an existing large object (not to be confused with a descriptor!)
     332             :  *
     333             :  * returns -1 if failed
     334             :  */
     335             : int
     336          12 : inv_drop(Oid lobjId)
     337             : {
     338             :     ObjectAddress object;
     339             : 
     340             :     /*
     341             :      * Delete any comments and dependencies on the large object
     342             :      */
     343          12 :     object.classId = LargeObjectRelationId;
     344          12 :     object.objectId = lobjId;
     345          12 :     object.objectSubId = 0;
     346          12 :     performDeletion(&object, DROP_CASCADE, 0);
     347             : 
     348             :     /*
     349             :      * Advance command counter so that tuple removal will be seen by later
     350             :      * large-object operations in this transaction.
     351             :      */
     352          12 :     CommandCounterIncrement();
     353             : 
     354          12 :     return 1;
     355             : }
     356             : 
     357             : /*
     358             :  * Determine size of a large object
     359             :  *
     360             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     361             :  * the offset of the last byte + 1.
     362             :  */
     363             : static uint64
     364          16 : inv_getsize(LargeObjectDesc *obj_desc)
     365             : {
     366          16 :     uint64      lastbyte = 0;
     367             :     ScanKeyData skey[1];
     368             :     SysScanDesc sd;
     369             :     HeapTuple   tuple;
     370             : 
     371          16 :     Assert(PointerIsValid(obj_desc));
     372             : 
     373          16 :     open_lo_relation();
     374             : 
     375          16 :     ScanKeyInit(&skey[0],
     376             :                 Anum_pg_largeobject_loid,
     377             :                 BTEqualStrategyNumber, F_OIDEQ,
     378             :                 ObjectIdGetDatum(obj_desc->id));
     379             : 
     380          16 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     381             :                                     obj_desc->snapshot, 1, skey);
     382             : 
     383             :     /*
     384             :      * Because the pg_largeobject index is on both loid and pageno, but we
     385             :      * constrain only loid, a backwards scan should visit all pages of the
     386             :      * large object in reverse pageno order.  So, it's sufficient to examine
     387             :      * the first valid tuple (== last valid page).
     388             :      */
     389          16 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     390          16 :     if (HeapTupleIsValid(tuple))
     391             :     {
     392             :         Form_pg_largeobject data;
     393             :         bytea      *datafield;
     394             :         int         len;
     395             :         bool        pfreeit;
     396             : 
     397          16 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     398           0 :             elog(ERROR, "null field found in pg_largeobject");
     399          16 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     400          16 :         getdatafield(data, &datafield, &len, &pfreeit);
     401          16 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     402          16 :         if (pfreeit)
     403           3 :             pfree(datafield);
     404             :     }
     405             : 
     406          16 :     systable_endscan_ordered(sd);
     407             : 
     408          16 :     return lastbyte;
     409             : }
     410             : 
     411             : int64
     412          34 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     413             : {
     414             :     int64       newoffset;
     415             : 
     416          34 :     Assert(PointerIsValid(obj_desc));
     417             : 
     418             :     /*
     419             :      * Note: overflow in the additions is possible, but since we will reject
     420             :      * negative results, we don't need any extra test for that.
     421             :      */
     422          34 :     switch (whence)
     423             :     {
     424             :         case SEEK_SET:
     425          15 :             newoffset = offset;
     426          15 :             break;
     427             :         case SEEK_CUR:
     428           3 :             newoffset = obj_desc->offset + offset;
     429           3 :             break;
     430             :         case SEEK_END:
     431          16 :             newoffset = inv_getsize(obj_desc) + offset;
     432          16 :             break;
     433             :         default:
     434           0 :             ereport(ERROR,
     435             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     436             :                      errmsg("invalid whence setting: %d", whence)));
     437             :             newoffset = 0;      /* keep compiler quiet */
     438             :             break;
     439             :     }
     440             : 
     441             :     /*
     442             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     443             :      * in translatable strings; doing better is not worth the trouble
     444             :      */
     445          34 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     446           0 :         ereport(ERROR,
     447             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     448             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     449             :                                  newoffset)));
     450             : 
     451          34 :     obj_desc->offset = newoffset;
     452          34 :     return newoffset;
     453             : }
     454             : 
     455             : int64
     456           8 : inv_tell(LargeObjectDesc *obj_desc)
     457             : {
     458           8 :     Assert(PointerIsValid(obj_desc));
     459             : 
     460           8 :     return obj_desc->offset;
     461             : }
     462             : 
     463             : int
     464         189 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     465             : {
     466         189 :     int         nread = 0;
     467             :     int64       n;
     468             :     int64       off;
     469             :     int         len;
     470         189 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     471             :     uint64      pageoff;
     472             :     ScanKeyData skey[2];
     473             :     SysScanDesc sd;
     474             :     HeapTuple   tuple;
     475             : 
     476         189 :     Assert(PointerIsValid(obj_desc));
     477         189 :     Assert(buf != NULL);
     478             : 
     479         189 :     if (nbytes <= 0)
     480           0 :         return 0;
     481             : 
     482         189 :     open_lo_relation();
     483             : 
     484         189 :     ScanKeyInit(&skey[0],
     485             :                 Anum_pg_largeobject_loid,
     486             :                 BTEqualStrategyNumber, F_OIDEQ,
     487             :                 ObjectIdGetDatum(obj_desc->id));
     488             : 
     489         189 :     ScanKeyInit(&skey[1],
     490             :                 Anum_pg_largeobject_pageno,
     491             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     492             :                 Int32GetDatum(pageno));
     493             : 
     494         189 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     495             :                                     obj_desc->snapshot, 2, skey);
     496             : 
     497        1860 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     498             :     {
     499             :         Form_pg_largeobject data;
     500             :         bytea      *datafield;
     501             :         bool        pfreeit;
     502             : 
     503        1659 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     504           0 :             elog(ERROR, "null field found in pg_largeobject");
     505        1659 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     506             : 
     507             :         /*
     508             :          * We expect the indexscan will deliver pages in order.  However,
     509             :          * there may be missing pages if the LO contains unwritten "holes". We
     510             :          * want missing sections to read out as zeroes.
     511             :          */
     512        1659 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     513        1659 :         if (pageoff > obj_desc->offset)
     514             :         {
     515           2 :             n = pageoff - obj_desc->offset;
     516           2 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     517           2 :             MemSet(buf + nread, 0, n);
     518           2 :             nread += n;
     519           2 :             obj_desc->offset += n;
     520             :         }
     521             : 
     522        1659 :         if (nread < nbytes)
     523             :         {
     524        1658 :             Assert(obj_desc->offset >= pageoff);
     525        1658 :             off = (int) (obj_desc->offset - pageoff);
     526        1658 :             Assert(off >= 0 && off < LOBLKSIZE);
     527             : 
     528        1658 :             getdatafield(data, &datafield, &len, &pfreeit);
     529        1658 :             if (len > off)
     530             :             {
     531        1655 :                 n = len - off;
     532        1655 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     533        1655 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     534        1655 :                 nread += n;
     535        1655 :                 obj_desc->offset += n;
     536             :             }
     537        1658 :             if (pfreeit)
     538        1648 :                 pfree(datafield);
     539             :         }
     540             : 
     541        1659 :         if (nread >= nbytes)
     542         177 :             break;
     543             :     }
     544             : 
     545         189 :     systable_endscan_ordered(sd);
     546             : 
     547         189 :     return nread;
     548             : }
     549             : 
     550             : int
     551         258 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     552             : {
     553         258 :     int         nwritten = 0;
     554             :     int         n;
     555             :     int         off;
     556             :     int         len;
     557         258 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     558             :     ScanKeyData skey[2];
     559             :     SysScanDesc sd;
     560             :     HeapTuple   oldtuple;
     561             :     Form_pg_largeobject olddata;
     562             :     bool        neednextpage;
     563             :     bytea      *datafield;
     564             :     bool        pfreeit;
     565             :     union
     566             :     {
     567             :         bytea       hdr;
     568             :         /* this is to make the union big enough for a LO data chunk: */
     569             :         char        data[LOBLKSIZE + VARHDRSZ];
     570             :         /* ensure union is aligned well enough: */
     571             :         int32       align_it;
     572             :     }           workbuf;
     573         258 :     char       *workb = VARDATA(&workbuf.hdr);
     574             :     HeapTuple   newtup;
     575             :     Datum       values[Natts_pg_largeobject];
     576             :     bool        nulls[Natts_pg_largeobject];
     577             :     bool        replace[Natts_pg_largeobject];
     578             :     CatalogIndexState indstate;
     579             : 
     580         258 :     Assert(PointerIsValid(obj_desc));
     581         258 :     Assert(buf != NULL);
     582             : 
     583             :     /* enforce writability because snapshot is probably wrong otherwise */
     584         258 :     Assert(obj_desc->flags & IFS_WRLOCK);
     585             : 
     586         258 :     if (nbytes <= 0)
     587           0 :         return 0;
     588             : 
     589             :     /* this addition can't overflow because nbytes is only int32 */
     590         258 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     591           0 :         ereport(ERROR,
     592             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     593             :                  errmsg("invalid large object write request size: %d",
     594             :                         nbytes)));
     595             : 
     596         258 :     open_lo_relation();
     597             : 
     598         258 :     indstate = CatalogOpenIndexes(lo_heap_r);
     599             : 
     600         258 :     ScanKeyInit(&skey[0],
     601             :                 Anum_pg_largeobject_loid,
     602             :                 BTEqualStrategyNumber, F_OIDEQ,
     603             :                 ObjectIdGetDatum(obj_desc->id));
     604             : 
     605         258 :     ScanKeyInit(&skey[1],
     606             :                 Anum_pg_largeobject_pageno,
     607             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     608             :                 Int32GetDatum(pageno));
     609             : 
     610         258 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     611             :                                     obj_desc->snapshot, 2, skey);
     612             : 
     613         258 :     oldtuple = NULL;
     614         258 :     olddata = NULL;
     615         258 :     neednextpage = true;
     616             : 
     617        1840 :     while (nwritten < nbytes)
     618             :     {
     619             :         /*
     620             :          * If possible, get next pre-existing page of the LO.  We expect the
     621             :          * indexscan will deliver these in order --- but there may be holes.
     622             :          */
     623        1324 :         if (neednextpage)
     624             :         {
     625         259 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     626             :             {
     627           4 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     628           0 :                     elog(ERROR, "null field found in pg_largeobject");
     629           4 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     630           4 :                 Assert(olddata->pageno >= pageno);
     631             :             }
     632         259 :             neednextpage = false;
     633             :         }
     634             : 
     635             :         /*
     636             :          * If we have a pre-existing page, see if it is the page we want to
     637             :          * write, or a later one.
     638             :          */
     639        1324 :         if (olddata != NULL && olddata->pageno == pageno)
     640             :         {
     641             :             /*
     642             :              * Update an existing page with fresh data.
     643             :              *
     644             :              * First, load old data into workbuf
     645             :              */
     646           4 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     647           4 :             memcpy(workb, VARDATA(datafield), len);
     648           4 :             if (pfreeit)
     649           3 :                 pfree(datafield);
     650             : 
     651             :             /*
     652             :              * Fill any hole
     653             :              */
     654           4 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     655           4 :             if (off > len)
     656           0 :                 MemSet(workb + len, 0, off - len);
     657             : 
     658             :             /*
     659             :              * Insert appropriate portion of new data
     660             :              */
     661           4 :             n = LOBLKSIZE - off;
     662           4 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     663           4 :             memcpy(workb + off, buf + nwritten, n);
     664           4 :             nwritten += n;
     665           4 :             obj_desc->offset += n;
     666           4 :             off += n;
     667             :             /* compute valid length of new page */
     668           4 :             len = (len >= off) ? len : off;
     669           4 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     670             : 
     671             :             /*
     672             :              * Form and insert updated tuple
     673             :              */
     674           4 :             memset(values, 0, sizeof(values));
     675           4 :             memset(nulls, false, sizeof(nulls));
     676           4 :             memset(replace, false, sizeof(replace));
     677           4 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     678           4 :             replace[Anum_pg_largeobject_data - 1] = true;
     679           4 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     680             :                                        values, nulls, replace);
     681           4 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     682             :                                        indstate);
     683           4 :             heap_freetuple(newtup);
     684             : 
     685             :             /*
     686             :              * We're done with this old page.
     687             :              */
     688           4 :             oldtuple = NULL;
     689           4 :             olddata = NULL;
     690           4 :             neednextpage = true;
     691             :         }
     692             :         else
     693             :         {
     694             :             /*
     695             :              * Write a brand new page.
     696             :              *
     697             :              * First, fill any hole
     698             :              */
     699        1320 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     700        1320 :             if (off > 0)
     701           1 :                 MemSet(workb, 0, off);
     702             : 
     703             :             /*
     704             :              * Insert appropriate portion of new data
     705             :              */
     706        1320 :             n = LOBLKSIZE - off;
     707        1320 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     708        1320 :             memcpy(workb + off, buf + nwritten, n);
     709        1320 :             nwritten += n;
     710        1320 :             obj_desc->offset += n;
     711             :             /* compute valid length of new page */
     712        1320 :             len = off + n;
     713        1320 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     714             : 
     715             :             /*
     716             :              * Form and insert updated tuple
     717             :              */
     718        1320 :             memset(values, 0, sizeof(values));
     719        1320 :             memset(nulls, false, sizeof(nulls));
     720        1320 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     721        1320 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     722        1320 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     723        1320 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     724        1320 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     725        1320 :             heap_freetuple(newtup);
     726             :         }
     727        1324 :         pageno++;
     728             :     }
     729             : 
     730         258 :     systable_endscan_ordered(sd);
     731             : 
     732         258 :     CatalogCloseIndexes(indstate);
     733             : 
     734             :     /*
     735             :      * Advance command counter so that my tuple updates will be seen by later
     736             :      * large-object operations in this transaction.
     737             :      */
     738         258 :     CommandCounterIncrement();
     739             : 
     740         258 :     return nwritten;
     741             : }
     742             : 
     743             : void
     744           7 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     745             : {
     746           7 :     int32       pageno = (int32) (len / LOBLKSIZE);
     747             :     int32       off;
     748             :     ScanKeyData skey[2];
     749             :     SysScanDesc sd;
     750             :     HeapTuple   oldtuple;
     751             :     Form_pg_largeobject olddata;
     752             :     union
     753             :     {
     754             :         bytea       hdr;
     755             :         /* this is to make the union big enough for a LO data chunk: */
     756             :         char        data[LOBLKSIZE + VARHDRSZ];
     757             :         /* ensure union is aligned well enough: */
     758             :         int32       align_it;
     759             :     }           workbuf;
     760           7 :     char       *workb = VARDATA(&workbuf.hdr);
     761             :     HeapTuple   newtup;
     762             :     Datum       values[Natts_pg_largeobject];
     763             :     bool        nulls[Natts_pg_largeobject];
     764             :     bool        replace[Natts_pg_largeobject];
     765             :     CatalogIndexState indstate;
     766             : 
     767           7 :     Assert(PointerIsValid(obj_desc));
     768             : 
     769             :     /* enforce writability because snapshot is probably wrong otherwise */
     770           7 :     Assert(obj_desc->flags & IFS_WRLOCK);
     771             : 
     772             :     /*
     773             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     774             :      * in translatable strings; doing better is not worth the trouble
     775             :      */
     776           7 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     777           0 :         ereport(ERROR,
     778             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     779             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     780             :                                  len)));
     781             : 
     782           7 :     open_lo_relation();
     783             : 
     784           7 :     indstate = CatalogOpenIndexes(lo_heap_r);
     785             : 
     786             :     /*
     787             :      * Set up to find all pages with desired loid and pageno >= target
     788             :      */
     789           7 :     ScanKeyInit(&skey[0],
     790             :                 Anum_pg_largeobject_loid,
     791             :                 BTEqualStrategyNumber, F_OIDEQ,
     792             :                 ObjectIdGetDatum(obj_desc->id));
     793             : 
     794           7 :     ScanKeyInit(&skey[1],
     795             :                 Anum_pg_largeobject_pageno,
     796             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     797             :                 Int32GetDatum(pageno));
     798             : 
     799           7 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     800             :                                     obj_desc->snapshot, 2, skey);
     801             : 
     802             :     /*
     803             :      * If possible, get the page the truncation point is in. The truncation
     804             :      * point may be beyond the end of the LO or in a hole.
     805             :      */
     806           7 :     olddata = NULL;
     807           7 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     808             :     {
     809           4 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     810           0 :             elog(ERROR, "null field found in pg_largeobject");
     811           4 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     812           4 :         Assert(olddata->pageno >= pageno);
     813             :     }
     814             : 
     815             :     /*
     816             :      * If we found the page of the truncation point we need to truncate the
     817             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     818             :      * mark the end of data.
     819             :      */
     820           7 :     if (olddata != NULL && olddata->pageno == pageno)
     821           2 :     {
     822             :         /* First, load old data into workbuf */
     823             :         bytea      *datafield;
     824             :         int         pagelen;
     825             :         bool        pfreeit;
     826             : 
     827           2 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     828           2 :         memcpy(workb, VARDATA(datafield), pagelen);
     829           2 :         if (pfreeit)
     830           1 :             pfree(datafield);
     831             : 
     832             :         /*
     833             :          * Fill any hole
     834             :          */
     835           2 :         off = len % LOBLKSIZE;
     836           2 :         if (off > pagelen)
     837           1 :             MemSet(workb + pagelen, 0, off - pagelen);
     838             : 
     839             :         /* compute length of new page */
     840           2 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     841             : 
     842             :         /*
     843             :          * Form and insert updated tuple
     844             :          */
     845           2 :         memset(values, 0, sizeof(values));
     846           2 :         memset(nulls, false, sizeof(nulls));
     847           2 :         memset(replace, false, sizeof(replace));
     848           2 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     849           2 :         replace[Anum_pg_largeobject_data - 1] = true;
     850           2 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     851             :                                    values, nulls, replace);
     852           2 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     853             :                                    indstate);
     854           2 :         heap_freetuple(newtup);
     855             :     }
     856             :     else
     857             :     {
     858             :         /*
     859             :          * If the first page we found was after the truncation point, we're in
     860             :          * a hole that we'll fill, but we need to delete the later page
     861             :          * because the loop below won't visit it again.
     862             :          */
     863           5 :         if (olddata != NULL)
     864             :         {
     865           2 :             Assert(olddata->pageno > pageno);
     866           2 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     867             :         }
     868             : 
     869             :         /*
     870             :          * Write a brand new page.
     871             :          *
     872             :          * Fill the hole up to the truncation point
     873             :          */
     874           5 :         off = len % LOBLKSIZE;
     875           5 :         if (off > 0)
     876           5 :             MemSet(workb, 0, off);
     877             : 
     878             :         /* compute length of new page */
     879           5 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     880             : 
     881             :         /*
     882             :          * Form and insert new tuple
     883             :          */
     884           5 :         memset(values, 0, sizeof(values));
     885           5 :         memset(nulls, false, sizeof(nulls));
     886           5 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     887           5 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     888           5 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     889           5 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     890           5 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     891           5 :         heap_freetuple(newtup);
     892             :     }
     893             : 
     894             :     /*
     895             :      * Delete any pages after the truncation point.  If the initial search
     896             :      * didn't find a page, then of course there's nothing more to do.
     897             :      */
     898           7 :     if (olddata != NULL)
     899             :     {
     900           9 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     901             :         {
     902           1 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     903             :         }
     904             :     }
     905             : 
     906           7 :     systable_endscan_ordered(sd);
     907             : 
     908           7 :     CatalogCloseIndexes(indstate);
     909             : 
     910             :     /*
     911             :      * Advance command counter so that tuple updates will be seen by later
     912             :      * large-object operations in this transaction.
     913             :      */
     914           7 :     CommandCounterIncrement();
     915           7 : }

Generated by: LCOV version 1.11