LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 67 289 23.2 %
Date: 2017-09-29 13:40:31 Functions: 13 29 44.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * commit_ts.c
       4             :  *      PostgreSQL commit timestamp manager
       5             :  *
       6             :  * This module is a pg_xact-like system that stores the commit timestamp
       7             :  * for each transaction.
       8             :  *
       9             :  * XLOG interactions: this module generates an XLOG record whenever a new
      10             :  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
      11             :  * generated for setting of values when the caller requests it; this allows
      12             :  * us to support values coming from places other than transaction commit.
      13             :  * Other writes of CommitTS come from recording of transaction commit in
      14             :  * xact.c, which generates its own XLOG records for these events and will
      15             :  * re-perform the status update on redo; so we need make no additional XLOG
      16             :  * entry here.
      17             :  *
      18             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      19             :  * Portions Copyright (c) 1994, Regents of the University of California
      20             :  *
      21             :  * src/backend/access/transam/commit_ts.c
      22             :  *
      23             :  *-------------------------------------------------------------------------
      24             :  */
      25             : #include "postgres.h"
      26             : 
      27             : #include "access/commit_ts.h"
      28             : #include "access/htup_details.h"
      29             : #include "access/slru.h"
      30             : #include "access/transam.h"
      31             : #include "catalog/pg_type.h"
      32             : #include "funcapi.h"
      33             : #include "miscadmin.h"
      34             : #include "pg_trace.h"
      35             : #include "storage/shmem.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/snapmgr.h"
      38             : #include "utils/timestamp.h"
      39             : 
      40             : /*
      41             :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      42             :  * everywhere else in Postgres.
      43             :  *
      44             :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      45             :  * CommitTs page numbering also wraps around at
      46             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      47             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      48             :  * explicit notice of that fact in this module, except when comparing segment
      49             :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      50             :  */
      51             : 
      52             : /*
      53             :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      54             :  * the largest possible file name is more than 5 chars long; see
      55             :  * SlruScanDirectory.
      56             :  */
      57             : typedef struct CommitTimestampEntry
      58             : {
      59             :     TimestampTz time;
      60             :     RepOriginId nodeid;
      61             : } CommitTimestampEntry;
      62             : 
      63             : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      64             :                                     sizeof(RepOriginId))
      65             : 
      66             : #define COMMIT_TS_XACTS_PER_PAGE \
      67             :     (BLCKSZ / SizeOfCommitTimestampEntry)
      68             : 
      69             : #define TransactionIdToCTsPage(xid) \
      70             :     ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      71             : #define TransactionIdToCTsEntry(xid)    \
      72             :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      73             : 
      74             : /*
      75             :  * Link to shared-memory data structures for CommitTs control
      76             :  */
      77             : static SlruCtlData CommitTsCtlData;
      78             : 
      79             : #define CommitTsCtl (&CommitTsCtlData)
      80             : 
      81             : /*
      82             :  * We keep a cache of the last value set in shared memory.
      83             :  *
      84             :  * This is also good place to keep the activation status.  We keep this
      85             :  * separate from the GUC so that the standby can activate the module if the
      86             :  * primary has it active independently of the value of the GUC.
      87             :  *
      88             :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
      89             :  * without acquiring the lock; where this happens, a comment explains the
      90             :  * rationale for it.
      91             :  */
      92             : typedef struct CommitTimestampShared
      93             : {
      94             :     TransactionId xidLastCommit;
      95             :     CommitTimestampEntry dataLastCommit;
      96             :     bool        commitTsActive;
      97             : } CommitTimestampShared;
      98             : 
      99             : CommitTimestampShared *commitTsShared;
     100             : 
     101             : 
     102             : /* GUC variable */
     103             : bool        track_commit_timestamp;
     104             : 
     105             : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     106             :                      TransactionId *subxids, TimestampTz ts,
     107             :                      RepOriginId nodeid, int pageno);
     108             : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     109             :                          RepOriginId nodeid, int slotno);
     110             : static void error_commit_ts_disabled(void);
     111             : static int  ZeroCommitTsPage(int pageno, bool writeXlog);
     112             : static bool CommitTsPagePrecedes(int page1, int page2);
     113             : static void ActivateCommitTs(void);
     114             : static void DeactivateCommitTs(void);
     115             : static void WriteZeroPageXlogRec(int pageno);
     116             : static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
     117             : static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
     118             :                          TransactionId *subxids, TimestampTz timestamp,
     119             :                          RepOriginId nodeid);
     120             : 
     121             : /*
     122             :  * TransactionTreeSetCommitTsData
     123             :  *
     124             :  * Record the final commit timestamp of transaction entries in the commit log
     125             :  * for a transaction and its subtransaction tree, as efficiently as possible.
     126             :  *
     127             :  * xid is the top level transaction id.
     128             :  *
     129             :  * subxids is an array of xids of length nsubxids, representing subtransactions
     130             :  * in the tree of xid. In various cases nsubxids may be zero.
     131             :  * The reason why tracking just the parent xid commit timestamp is not enough
     132             :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     133             :  * permanent) so we need to keep the information about them here. If the
     134             :  * subtrans implementation changes in the future, we might want to revisit the
     135             :  * decision of storing timestamp info for each subxid.
     136             :  *
     137             :  * The write_xlog parameter tells us whether to include an XLog record of this
     138             :  * or not.  Normally, this is called from transaction commit routines (both
     139             :  * normal and prepared) and the information will be stored in the transaction
     140             :  * commit XLog record, and so they should pass "false" for this.  The XLog redo
     141             :  * code should use "false" here as well.  Other callers probably want to pass
     142             :  * true, so that the given values persist in case of crashes.
     143             :  */
     144             : void
     145        9878 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     146             :                                TransactionId *subxids, TimestampTz timestamp,
     147             :                                RepOriginId nodeid, bool write_xlog)
     148             : {
     149             :     int         i;
     150             :     TransactionId headxid;
     151             :     TransactionId newestXact;
     152             : 
     153             :     /*
     154             :      * No-op if the module is not active.
     155             :      *
     156             :      * An unlocked read here is fine, because in a standby (the only place
     157             :      * where the flag can change in flight) this routine is only called by the
     158             :      * recovery process, which is also the only process which can change the
     159             :      * flag.
     160             :      */
     161        9878 :     if (!commitTsShared->commitTsActive)
     162       19756 :         return;
     163             : 
     164             :     /*
     165             :      * Comply with the WAL-before-data rule: if caller specified it wants this
     166             :      * value to be recorded in WAL, do so before touching the data.
     167             :      */
     168           0 :     if (write_xlog)
     169           0 :         WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
     170             : 
     171             :     /*
     172             :      * Figure out the latest Xid in this batch: either the last subxid if
     173             :      * there's any, otherwise the parent xid.
     174             :      */
     175           0 :     if (nsubxids > 0)
     176           0 :         newestXact = subxids[nsubxids - 1];
     177             :     else
     178           0 :         newestXact = xid;
     179             : 
     180             :     /*
     181             :      * We split the xids to set the timestamp to in groups belonging to the
     182             :      * same SLRU page; the first element in each such set is its head.  The
     183             :      * first group has the main XID as the head; subsequent sets use the first
     184             :      * subxid not on the previous page as head.  This way, we only have to
     185             :      * lock/modify each SLRU page once.
     186             :      */
     187           0 :     for (i = 0, headxid = xid;;)
     188             :     {
     189           0 :         int         pageno = TransactionIdToCTsPage(headxid);
     190             :         int         j;
     191             : 
     192           0 :         for (j = i; j < nsubxids; j++)
     193             :         {
     194           0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     195           0 :                 break;
     196             :         }
     197             :         /* subxids[i..j] are on the same page as the head */
     198             : 
     199           0 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     200             :                              pageno);
     201             : 
     202             :         /* if we wrote out all subxids, we're done. */
     203           0 :         if (j + 1 >= nsubxids)
     204           0 :             break;
     205             : 
     206             :         /*
     207             :          * Set the new head and skip over it, as well as over the subxids we
     208             :          * just wrote.
     209             :          */
     210           0 :         headxid = subxids[j];
     211           0 :         i += j - i + 1;
     212           0 :     }
     213             : 
     214             :     /* update the cached value in shared memory */
     215           0 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     216           0 :     commitTsShared->xidLastCommit = xid;
     217           0 :     commitTsShared->dataLastCommit.time = timestamp;
     218           0 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     219             : 
     220             :     /* and move forwards our endpoint, if needed */
     221           0 :     if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
     222           0 :         ShmemVariableCache->newestCommitTsXid = newestXact;
     223           0 :     LWLockRelease(CommitTsLock);
     224             : }
     225             : 
     226             : /*
     227             :  * Record the commit timestamp of transaction entries in the commit log for all
     228             :  * entries on a single page.  Atomic only on this page.
     229             :  */
     230             : static void
     231           0 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     232             :                      TransactionId *subxids, TimestampTz ts,
     233             :                      RepOriginId nodeid, int pageno)
     234             : {
     235             :     int         slotno;
     236             :     int         i;
     237             : 
     238           0 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     239             : 
     240           0 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
     241             : 
     242           0 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     243           0 :     for (i = 0; i < nsubxids; i++)
     244           0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     245             : 
     246           0 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     247             : 
     248           0 :     LWLockRelease(CommitTsControlLock);
     249           0 : }
     250             : 
     251             : /*
     252             :  * Sets the commit timestamp of a single transaction.
     253             :  *
     254             :  * Must be called with CommitTsControlLock held
     255             :  */
     256             : static void
     257           0 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     258             :                          RepOriginId nodeid, int slotno)
     259             : {
     260           0 :     int         entryno = TransactionIdToCTsEntry(xid);
     261             :     CommitTimestampEntry entry;
     262             : 
     263           0 :     Assert(TransactionIdIsNormal(xid));
     264             : 
     265           0 :     entry.time = ts;
     266           0 :     entry.nodeid = nodeid;
     267             : 
     268           0 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     269             :            SizeOfCommitTimestampEntry * entryno,
     270             :            &entry, SizeOfCommitTimestampEntry);
     271           0 : }
     272             : 
     273             : /*
     274             :  * Interrogate the commit timestamp of a transaction.
     275             :  *
     276             :  * The return value indicates whether a commit timestamp record was found for
     277             :  * the given xid.  The timestamp value is returned in *ts (which may not be
     278             :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     279             :  * null.
     280             :  */
     281             : bool
     282           0 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     283             :                              RepOriginId *nodeid)
     284             : {
     285           0 :     int         pageno = TransactionIdToCTsPage(xid);
     286           0 :     int         entryno = TransactionIdToCTsEntry(xid);
     287             :     int         slotno;
     288             :     CommitTimestampEntry entry;
     289             :     TransactionId oldestCommitTsXid;
     290             :     TransactionId newestCommitTsXid;
     291             : 
     292           0 :     if (!TransactionIdIsValid(xid))
     293           0 :         ereport(ERROR,
     294             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     295             :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     296           0 :     else if (!TransactionIdIsNormal(xid))
     297             :     {
     298             :         /* frozen and bootstrap xids are always committed far in the past */
     299           0 :         *ts = 0;
     300           0 :         if (nodeid)
     301           0 :             *nodeid = 0;
     302           0 :         return false;
     303             :     }
     304             : 
     305           0 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     306             : 
     307             :     /* Error if module not enabled */
     308           0 :     if (!commitTsShared->commitTsActive)
     309           0 :         error_commit_ts_disabled();
     310             : 
     311             :     /*
     312             :      * If we're asked for the cached value, return that.  Otherwise, fall
     313             :      * through to read from SLRU.
     314             :      */
     315           0 :     if (commitTsShared->xidLastCommit == xid)
     316             :     {
     317           0 :         *ts = commitTsShared->dataLastCommit.time;
     318           0 :         if (nodeid)
     319           0 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     320             : 
     321           0 :         LWLockRelease(CommitTsLock);
     322           0 :         return *ts != 0;
     323             :     }
     324             : 
     325           0 :     oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
     326           0 :     newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
     327             :     /* neither is invalid, or both are */
     328           0 :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     329           0 :     LWLockRelease(CommitTsLock);
     330             : 
     331             :     /*
     332             :      * Return empty if the requested value is outside our valid range.
     333             :      */
     334           0 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     335           0 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     336           0 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     337             :     {
     338           0 :         *ts = 0;
     339           0 :         if (nodeid)
     340           0 :             *nodeid = InvalidRepOriginId;
     341           0 :         return false;
     342             :     }
     343             : 
     344             :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     345           0 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
     346           0 :     memcpy(&entry,
     347           0 :            CommitTsCtl->shared->page_buffer[slotno] +
     348             :            SizeOfCommitTimestampEntry * entryno,
     349             :            SizeOfCommitTimestampEntry);
     350             : 
     351           0 :     *ts = entry.time;
     352           0 :     if (nodeid)
     353           0 :         *nodeid = entry.nodeid;
     354             : 
     355           0 :     LWLockRelease(CommitTsControlLock);
     356           0 :     return *ts != 0;
     357             : }
     358             : 
     359             : /*
     360             :  * Return the Xid of the latest committed transaction.  (As far as this module
     361             :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     362             :  * for its purposes.)
     363             :  *
     364             :  * ts and extra are filled with the corresponding data; they can be passed
     365             :  * as NULL if not wanted.
     366             :  */
     367             : TransactionId
     368           0 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
     369             : {
     370             :     TransactionId xid;
     371             : 
     372           0 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     373             : 
     374             :     /* Error if module not enabled */
     375           0 :     if (!commitTsShared->commitTsActive)
     376           0 :         error_commit_ts_disabled();
     377             : 
     378           0 :     xid = commitTsShared->xidLastCommit;
     379           0 :     if (ts)
     380           0 :         *ts = commitTsShared->dataLastCommit.time;
     381           0 :     if (nodeid)
     382           0 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     383           0 :     LWLockRelease(CommitTsLock);
     384             : 
     385           0 :     return xid;
     386             : }
     387             : 
     388             : static void
     389           0 : error_commit_ts_disabled(void)
     390             : {
     391           0 :     ereport(ERROR,
     392             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     393             :              errmsg("could not get commit timestamp data"),
     394             :              RecoveryInProgress() ?
     395             :              errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
     396             :                      "track_commit_timestamp") :
     397             :              errhint("Make sure the configuration parameter \"%s\" is set.",
     398             :                      "track_commit_timestamp")));
     399             : }
     400             : 
     401             : /*
     402             :  * SQL-callable wrapper to obtain commit time of a transaction
     403             :  */
     404             : Datum
     405           0 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     406             : {
     407           0 :     TransactionId xid = PG_GETARG_UINT32(0);
     408             :     TimestampTz ts;
     409             :     bool        found;
     410             : 
     411           0 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     412             : 
     413           0 :     if (!found)
     414           0 :         PG_RETURN_NULL();
     415             : 
     416           0 :     PG_RETURN_TIMESTAMPTZ(ts);
     417             : }
     418             : 
     419             : 
     420             : Datum
     421           0 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     422             : {
     423             :     TransactionId xid;
     424             :     TimestampTz ts;
     425             :     Datum       values[2];
     426             :     bool        nulls[2];
     427             :     TupleDesc   tupdesc;
     428             :     HeapTuple   htup;
     429             : 
     430             :     /* and construct a tuple with our data */
     431           0 :     xid = GetLatestCommitTsData(&ts, NULL);
     432             : 
     433             :     /*
     434             :      * Construct a tuple descriptor for the result row.  This must match this
     435             :      * function's pg_proc entry!
     436             :      */
     437           0 :     tupdesc = CreateTemplateTupleDesc(2, false);
     438           0 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
     439             :                        XIDOID, -1, 0);
     440           0 :     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
     441             :                        TIMESTAMPTZOID, -1, 0);
     442           0 :     tupdesc = BlessTupleDesc(tupdesc);
     443             : 
     444           0 :     if (!TransactionIdIsNormal(xid))
     445             :     {
     446           0 :         memset(nulls, true, sizeof(nulls));
     447             :     }
     448             :     else
     449             :     {
     450           0 :         values[0] = TransactionIdGetDatum(xid);
     451           0 :         nulls[0] = false;
     452             : 
     453           0 :         values[1] = TimestampTzGetDatum(ts);
     454           0 :         nulls[1] = false;
     455             :     }
     456             : 
     457           0 :     htup = heap_form_tuple(tupdesc, values, nulls);
     458             : 
     459           0 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     460             : }
     461             : 
     462             : 
     463             : /*
     464             :  * Number of shared CommitTS buffers.
     465             :  *
     466             :  * We use a very similar logic as for the number of CLOG buffers; see comments
     467             :  * in CLOGShmemBuffers.
     468             :  */
     469             : Size
     470          10 : CommitTsShmemBuffers(void)
     471             : {
     472          10 :     return Min(16, Max(4, NBuffers / 1024));
     473             : }
     474             : 
     475             : /*
     476             :  * Shared memory sizing for CommitTs
     477             :  */
     478             : Size
     479           5 : CommitTsShmemSize(void)
     480             : {
     481           5 :     return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
     482             :         sizeof(CommitTimestampShared);
     483             : }
     484             : 
     485             : /*
     486             :  * Initialize CommitTs at system startup (postmaster start or standalone
     487             :  * backend)
     488             :  */
     489             : void
     490           5 : CommitTsShmemInit(void)
     491             : {
     492             :     bool        found;
     493             : 
     494           5 :     CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
     495           5 :     SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
     496           5 :                   CommitTsControlLock, "pg_commit_ts",
     497             :                   LWTRANCHE_COMMITTS_BUFFERS);
     498             : 
     499           5 :     commitTsShared = ShmemInitStruct("CommitTs shared",
     500             :                                      sizeof(CommitTimestampShared),
     501             :                                      &found);
     502             : 
     503           5 :     if (!IsUnderPostmaster)
     504             :     {
     505           5 :         Assert(!found);
     506             : 
     507           5 :         commitTsShared->xidLastCommit = InvalidTransactionId;
     508           5 :         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     509           5 :         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     510           5 :         commitTsShared->commitTsActive = false;
     511             :     }
     512             :     else
     513           0 :         Assert(found);
     514           5 : }
     515             : 
     516             : /*
     517             :  * This function must be called ONCE on system install.
     518             :  *
     519             :  * (The CommitTs directory is assumed to have been created by initdb, and
     520             :  * CommitTsShmemInit must have been called already.)
     521             :  */
     522             : void
     523           1 : BootStrapCommitTs(void)
     524             : {
     525             :     /*
     526             :      * Nothing to do here at present, unlike most other SLRU modules; segments
     527             :      * are created when the server is started with this module enabled. See
     528             :      * ActivateCommitTs.
     529             :      */
     530           1 : }
     531             : 
     532             : /*
     533             :  * Initialize (or reinitialize) a page of CommitTs to zeroes.
     534             :  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
     535             :  *
     536             :  * The page is not actually written, just set up in shared memory.
     537             :  * The slot number of the new page is returned.
     538             :  *
     539             :  * Control lock must be held at entry, and will be held at exit.
     540             :  */
     541             : static int
     542           0 : ZeroCommitTsPage(int pageno, bool writeXlog)
     543             : {
     544             :     int         slotno;
     545             : 
     546           0 :     slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
     547             : 
     548           0 :     if (writeXlog)
     549           0 :         WriteZeroPageXlogRec(pageno);
     550             : 
     551           0 :     return slotno;
     552             : }
     553             : 
     554             : /*
     555             :  * This must be called ONCE during postmaster or standalone-backend startup,
     556             :  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
     557             :  */
     558             : void
     559           0 : StartupCommitTs(void)
     560             : {
     561           0 :     ActivateCommitTs();
     562           0 : }
     563             : 
     564             : /*
     565             :  * This must be called ONCE during postmaster or standalone-backend startup,
     566             :  * after recovery has finished.
     567             :  */
     568             : void
     569           3 : CompleteCommitTsInitialization(void)
     570             : {
     571             :     /*
     572             :      * If the feature is not enabled, turn it off for good.  This also removes
     573             :      * any leftover data.
     574             :      *
     575             :      * Conversely, we activate the module if the feature is enabled.  This is
     576             :      * not necessary in a master system because we already did it earlier, but
     577             :      * if we're in a standby server that got promoted which had the feature
     578             :      * enabled and was following a master that had the feature disabled, this
     579             :      * is where we turn it on locally.
     580             :      */
     581           3 :     if (!track_commit_timestamp)
     582           3 :         DeactivateCommitTs();
     583             :     else
     584           0 :         ActivateCommitTs();
     585           3 : }
     586             : 
     587             : /*
     588             :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     589             :  * XLog record in a standby.
     590             :  */
     591             : void
     592           0 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     593             : {
     594             :     /*
     595             :      * If the commit_ts module is disabled in this server and we get word from
     596             :      * the master server that it is enabled there, activate it so that we can
     597             :      * replay future WAL records involving it; also mark it as active on
     598             :      * pg_control.  If the old value was already set, we already did this, so
     599             :      * don't do anything.
     600             :      *
     601             :      * If the module is disabled in the master, disable it here too, unless
     602             :      * the module is enabled locally.
     603             :      *
     604             :      * Note this only runs in the recovery process, so an unlocked read is
     605             :      * fine.
     606             :      */
     607           0 :     if (newvalue)
     608             :     {
     609           0 :         if (!commitTsShared->commitTsActive)
     610           0 :             ActivateCommitTs();
     611             :     }
     612           0 :     else if (commitTsShared->commitTsActive)
     613           0 :         DeactivateCommitTs();
     614           0 : }
     615             : 
     616             : /*
     617             :  * Activate this module whenever necessary.
     618             :  *      This must happen during postmaster or standalone-backend startup,
     619             :  *      or during WAL replay anytime the track_commit_timestamp setting is
     620             :  *      changed in the master.
     621             :  *
     622             :  * The reason why this SLRU needs separate activation/deactivation functions is
     623             :  * that it can be enabled/disabled during start and the activation/deactivation
     624             :  * on master is propagated to standby via replay. Other SLRUs don't have this
     625             :  * property and they can be just initialized during normal startup.
     626             :  *
     627             :  * This is in charge of creating the currently active segment, if it's not
     628             :  * already there.  The reason for this is that the server might have been
     629             :  * running with this module disabled for a while and thus might have skipped
     630             :  * the normal creation point.
     631             :  */
     632             : static void
     633           0 : ActivateCommitTs(void)
     634             : {
     635             :     TransactionId xid;
     636             :     int         pageno;
     637             : 
     638             :     /* If we've done this already, there's nothing to do */
     639           0 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     640           0 :     if (commitTsShared->commitTsActive)
     641             :     {
     642           0 :         LWLockRelease(CommitTsLock);
     643           0 :         return;
     644             :     }
     645           0 :     LWLockRelease(CommitTsLock);
     646             : 
     647           0 :     xid = ShmemVariableCache->nextXid;
     648           0 :     pageno = TransactionIdToCTsPage(xid);
     649             : 
     650             :     /*
     651             :      * Re-Initialize our idea of the latest page number.
     652             :      */
     653           0 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     654           0 :     CommitTsCtl->shared->latest_page_number = pageno;
     655           0 :     LWLockRelease(CommitTsControlLock);
     656             : 
     657             :     /*
     658             :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     659             :      * need to set the oldest and newest values to the next Xid; that way, we
     660             :      * will not try to read data that might not have been set.
     661             :      *
     662             :      * XXX does this have a problem if a server is started with commitTs
     663             :      * enabled, then started with commitTs disabled, then restarted with it
     664             :      * enabled again?  It doesn't look like it does, because there should be a
     665             :      * checkpoint that sets the value to InvalidTransactionId at end of
     666             :      * recovery; and so any chance of injecting new transactions without
     667             :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     668             :      * Invalid temporarily.
     669             :      */
     670           0 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     671           0 :     if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
     672             :     {
     673           0 :         ShmemVariableCache->oldestCommitTsXid =
     674           0 :             ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
     675             :     }
     676           0 :     LWLockRelease(CommitTsLock);
     677             : 
     678             :     /* Create the current segment file, if necessary */
     679           0 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     680             :     {
     681             :         int         slotno;
     682             : 
     683           0 :         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     684           0 :         slotno = ZeroCommitTsPage(pageno, false);
     685           0 :         SimpleLruWritePage(CommitTsCtl, slotno);
     686           0 :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
     687           0 :         LWLockRelease(CommitTsControlLock);
     688             :     }
     689             : 
     690             :     /* Change the activation status in shared memory. */
     691           0 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     692           0 :     commitTsShared->commitTsActive = true;
     693           0 :     LWLockRelease(CommitTsLock);
     694             : }
     695             : 
     696             : /*
     697             :  * Deactivate this module.
     698             :  *
     699             :  * This must be called when the track_commit_timestamp parameter is turned off.
     700             :  * This happens during postmaster or standalone-backend startup, or during WAL
     701             :  * replay.
     702             :  *
     703             :  * Resets CommitTs into invalid state to make sure we don't hand back
     704             :  * possibly-invalid data; also removes segments of old data.
     705             :  */
     706             : static void
     707           3 : DeactivateCommitTs(void)
     708             : {
     709             :     /*
     710             :      * Cleanup the status in the shared memory.
     711             :      *
     712             :      * We reset everything in the commitTsShared record to prevent user from
     713             :      * getting confusing data about last committed transaction on the standby
     714             :      * when the module was activated repeatedly on the primary.
     715             :      */
     716           3 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     717             : 
     718           3 :     commitTsShared->commitTsActive = false;
     719           3 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     720           3 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     721           3 :     commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     722             : 
     723           3 :     ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
     724           3 :     ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
     725             : 
     726           3 :     LWLockRelease(CommitTsLock);
     727             : 
     728             :     /*
     729             :      * Remove *all* files.  This is necessary so that there are no leftover
     730             :      * files; in the case where this feature is later enabled after running
     731             :      * with it disabled for some time there may be a gap in the file sequence.
     732             :      * (We can probably tolerate out-of-sequence files, as they are going to
     733             :      * be overwritten anyway when we wrap around, but it seems better to be
     734             :      * tidy.)
     735             :      */
     736           3 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     737           3 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     738           3 :     LWLockRelease(CommitTsControlLock);
     739           3 : }
     740             : 
     741             : /*
     742             :  * This must be called ONCE during postmaster or standalone-backend shutdown
     743             :  */
     744             : void
     745           3 : ShutdownCommitTs(void)
     746             : {
     747             :     /* Flush dirty CommitTs pages to disk */
     748           3 :     SimpleLruFlush(CommitTsCtl, false);
     749             : 
     750             :     /*
     751             :      * fsync pg_commit_ts to ensure that any files flushed previously are
     752             :      * durably on disk.
     753             :      */
     754           3 :     fsync_fname("pg_commit_ts", true);
     755           3 : }
     756             : 
     757             : /*
     758             :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     759             :  */
     760             : void
     761          11 : CheckPointCommitTs(void)
     762             : {
     763             :     /* Flush dirty CommitTs pages to disk */
     764          11 :     SimpleLruFlush(CommitTsCtl, true);
     765             : 
     766             :     /*
     767             :      * fsync pg_commit_ts to ensure that any files flushed previously are
     768             :      * durably on disk.
     769             :      */
     770          11 :     fsync_fname("pg_commit_ts", true);
     771          11 : }
     772             : 
     773             : /*
     774             :  * Make sure that CommitTs has room for a newly-allocated XID.
     775             :  *
     776             :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     777             :  * most of the time; even when it's not so fast, no actual I/O need happen
     778             :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     779             :  * in shared memory.
     780             :  *
     781             :  * NB: the current implementation relies on track_commit_timestamp being
     782             :  * PGC_POSTMASTER.
     783             :  */
     784             : void
     785       10625 : ExtendCommitTs(TransactionId newestXact)
     786             : {
     787             :     int         pageno;
     788             : 
     789             :     /*
     790             :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     791             :      * the flag here, which is okay because this routine is only called from
     792             :      * GetNewTransactionId, which is never called in a standby.
     793             :      */
     794       10625 :     Assert(!InRecovery);
     795       10625 :     if (!commitTsShared->commitTsActive)
     796       10625 :         return;
     797             : 
     798             :     /*
     799             :      * No work except at first XID of a page.  But beware: just after
     800             :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     801             :      */
     802           0 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     803             :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     804           0 :         return;
     805             : 
     806           0 :     pageno = TransactionIdToCTsPage(newestXact);
     807             : 
     808           0 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     809             : 
     810             :     /* Zero the page and make an XLOG entry about it */
     811           0 :     ZeroCommitTsPage(pageno, !InRecovery);
     812             : 
     813           0 :     LWLockRelease(CommitTsControlLock);
     814             : }
     815             : 
     816             : /*
     817             :  * Remove all CommitTs segments before the one holding the passed
     818             :  * transaction ID.
     819             :  *
     820             :  * Note that we don't need to flush XLOG here.
     821             :  */
     822             : void
     823           2 : TruncateCommitTs(TransactionId oldestXact)
     824             : {
     825             :     int         cutoffPage;
     826             : 
     827             :     /*
     828             :      * The cutoff point is the start of the segment containing oldestXact. We
     829             :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     830             :      */
     831           2 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     832             : 
     833             :     /* Check to see if there's any files that could be removed */
     834           2 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     835             :                            &cutoffPage))
     836           4 :         return;                 /* nothing to remove */
     837             : 
     838             :     /* Write XLOG record */
     839           0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     840             : 
     841             :     /* Now we can remove the old CommitTs segment(s) */
     842           0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     843             : }
     844             : 
     845             : /*
     846             :  * Set the limit values between which commit TS can be consulted.
     847             :  */
     848             : void
     849           4 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     850             : {
     851             :     /*
     852             :      * Be careful not to overwrite values that are either further into the
     853             :      * "future" or signal a disabled committs.
     854             :      */
     855           4 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     856           4 :     if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
     857             :     {
     858           0 :         if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
     859           0 :             ShmemVariableCache->oldestCommitTsXid = oldestXact;
     860           0 :         if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
     861           0 :             ShmemVariableCache->newestCommitTsXid = newestXact;
     862             :     }
     863             :     else
     864             :     {
     865           4 :         Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
     866           4 :         ShmemVariableCache->oldestCommitTsXid = oldestXact;
     867           4 :         ShmemVariableCache->newestCommitTsXid = newestXact;
     868             :     }
     869           4 :     LWLockRelease(CommitTsLock);
     870           4 : }
     871             : 
     872             : /*
     873             :  * Move forwards the oldest commitTS value that can be consulted
     874             :  */
     875             : void
     876           2 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     877             : {
     878           2 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     879           2 :     if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
     880           0 :         TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
     881           0 :         ShmemVariableCache->oldestCommitTsXid = oldestXact;
     882           2 :     LWLockRelease(CommitTsLock);
     883           2 : }
     884             : 
     885             : 
     886             : /*
     887             :  * Decide which of two CLOG page numbers is "older" for truncation purposes.
     888             :  *
     889             :  * We need to use comparison of TransactionIds here in order to do the right
     890             :  * thing with wraparound XID arithmetic.  However, if we are asked about
     891             :  * page number zero, we don't want to hand InvalidTransactionId to
     892             :  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
     893             :  * offset both xids by FirstNormalTransactionId to avoid that.
     894             :  */
     895             : static bool
     896           0 : CommitTsPagePrecedes(int page1, int page2)
     897             : {
     898             :     TransactionId xid1;
     899             :     TransactionId xid2;
     900             : 
     901           0 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     902           0 :     xid1 += FirstNormalTransactionId;
     903           0 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     904           0 :     xid2 += FirstNormalTransactionId;
     905             : 
     906           0 :     return TransactionIdPrecedes(xid1, xid2);
     907             : }
     908             : 
     909             : 
     910             : /*
     911             :  * Write a ZEROPAGE xlog record
     912             :  */
     913             : static void
     914           0 : WriteZeroPageXlogRec(int pageno)
     915             : {
     916           0 :     XLogBeginInsert();
     917           0 :     XLogRegisterData((char *) (&pageno), sizeof(int));
     918           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
     919           0 : }
     920             : 
     921             : /*
     922             :  * Write a TRUNCATE xlog record
     923             :  */
     924             : static void
     925           0 : WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
     926             : {
     927             :     xl_commit_ts_truncate xlrec;
     928             : 
     929           0 :     xlrec.pageno = pageno;
     930           0 :     xlrec.oldestXid = oldestXid;
     931             : 
     932           0 :     XLogBeginInsert();
     933           0 :     XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
     934           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
     935           0 : }
     936             : 
     937             : /*
     938             :  * Write a SETTS xlog record
     939             :  */
     940             : static void
     941           0 : WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
     942             :                          TransactionId *subxids, TimestampTz timestamp,
     943             :                          RepOriginId nodeid)
     944             : {
     945             :     xl_commit_ts_set record;
     946             : 
     947           0 :     record.timestamp = timestamp;
     948           0 :     record.nodeid = nodeid;
     949           0 :     record.mainxid = mainxid;
     950             : 
     951           0 :     XLogBeginInsert();
     952           0 :     XLogRegisterData((char *) &record,
     953             :                      offsetof(xl_commit_ts_set, mainxid) +
     954             :                      sizeof(TransactionId));
     955           0 :     XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
     956           0 :     XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
     957           0 : }
     958             : 
     959             : /*
     960             :  * CommitTS resource manager's routines
     961             :  */
     962             : void
     963           0 : commit_ts_redo(XLogReaderState *record)
     964             : {
     965           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     966             : 
     967             :     /* Backup blocks are not used in commit_ts records */
     968           0 :     Assert(!XLogRecHasAnyBlockRefs(record));
     969             : 
     970           0 :     if (info == COMMIT_TS_ZEROPAGE)
     971             :     {
     972             :         int         pageno;
     973             :         int         slotno;
     974             : 
     975           0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(int));
     976             : 
     977           0 :         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     978             : 
     979           0 :         slotno = ZeroCommitTsPage(pageno, false);
     980           0 :         SimpleLruWritePage(CommitTsCtl, slotno);
     981           0 :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
     982             : 
     983           0 :         LWLockRelease(CommitTsControlLock);
     984             :     }
     985           0 :     else if (info == COMMIT_TS_TRUNCATE)
     986             :     {
     987           0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
     988             : 
     989           0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
     990             : 
     991             :         /*
     992             :          * During XLOG replay, latest_page_number isn't set up yet; insert a
     993             :          * suitable value to bypass the sanity test in SimpleLruTruncate.
     994             :          */
     995           0 :         CommitTsCtl->shared->latest_page_number = trunc->pageno;
     996             : 
     997           0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
     998             :     }
     999           0 :     else if (info == COMMIT_TS_SETTS)
    1000             :     {
    1001           0 :         xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
    1002             :         int         nsubxids;
    1003             :         TransactionId *subxids;
    1004             : 
    1005           0 :         nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
    1006             :                     sizeof(TransactionId));
    1007           0 :         if (nsubxids > 0)
    1008             :         {
    1009           0 :             subxids = palloc(sizeof(TransactionId) * nsubxids);
    1010           0 :             memcpy(subxids,
    1011           0 :                    XLogRecGetData(record) + SizeOfCommitTsSet,
    1012             :                    sizeof(TransactionId) * nsubxids);
    1013             :         }
    1014             :         else
    1015           0 :             subxids = NULL;
    1016             : 
    1017           0 :         TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
    1018           0 :                                        setts->timestamp, setts->nodeid, true);
    1019           0 :         if (subxids)
    1020           0 :             pfree(subxids);
    1021             :     }
    1022             :     else
    1023           0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1024           0 : }

Generated by: LCOV version 1.11