Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 : * generated for setting of values when the caller requests it; this allows
12 : * us to support values coming from places other than transaction commit.
13 : * Other writes of CommitTS come from recording of transaction commit in
14 : * xact.c, which generates its own XLOG records for these events and will
15 : * re-perform the status update on redo; so we need make no additional XLOG
16 : * entry here.
17 : *
18 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19 : * Portions Copyright (c) 1994, Regents of the University of California
20 : *
21 : * src/backend/access/transam/commit_ts.c
22 : *
23 : *-------------------------------------------------------------------------
24 : */
25 : #include "postgres.h"
26 :
27 : #include "access/commit_ts.h"
28 : #include "access/htup_details.h"
29 : #include "access/slru.h"
30 : #include "access/transam.h"
31 : #include "catalog/pg_type.h"
32 : #include "funcapi.h"
33 : #include "miscadmin.h"
34 : #include "pg_trace.h"
35 : #include "storage/shmem.h"
36 : #include "utils/builtins.h"
37 : #include "utils/snapmgr.h"
38 : #include "utils/timestamp.h"
39 :
40 : /*
41 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42 : * everywhere else in Postgres.
43 : *
44 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45 : * CommitTs page numbering also wraps around at
46 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48 : * explicit notice of that fact in this module, except when comparing segment
49 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50 : */
51 :
52 : /*
53 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54 : * the largest possible file name is more than 5 chars long; see
55 : * SlruScanDirectory.
56 : */
57 : typedef struct CommitTimestampEntry
58 : {
59 : TimestampTz time;
60 : RepOriginId nodeid;
61 : } CommitTimestampEntry;
62 :
63 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64 : sizeof(RepOriginId))
65 :
66 : #define COMMIT_TS_XACTS_PER_PAGE \
67 : (BLCKSZ / SizeOfCommitTimestampEntry)
68 :
69 : #define TransactionIdToCTsPage(xid) \
70 : ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 : #define TransactionIdToCTsEntry(xid) \
72 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 :
74 : /*
75 : * Link to shared-memory data structures for CommitTs control
76 : */
77 : static SlruCtlData CommitTsCtlData;
78 :
79 : #define CommitTsCtl (&CommitTsCtlData)
80 :
81 : /*
82 : * We keep a cache of the last value set in shared memory.
83 : *
84 : * This is also good place to keep the activation status. We keep this
85 : * separate from the GUC so that the standby can activate the module if the
86 : * primary has it active independently of the value of the GUC.
87 : *
88 : * This is protected by CommitTsLock. In some places, we use commitTsActive
89 : * without acquiring the lock; where this happens, a comment explains the
90 : * rationale for it.
91 : */
92 : typedef struct CommitTimestampShared
93 : {
94 : TransactionId xidLastCommit;
95 : CommitTimestampEntry dataLastCommit;
96 : bool commitTsActive;
97 : } CommitTimestampShared;
98 :
99 : CommitTimestampShared *commitTsShared;
100 :
101 :
102 : /* GUC variable */
103 : bool track_commit_timestamp;
104 :
105 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 : TransactionId *subxids, TimestampTz ts,
107 : RepOriginId nodeid, int pageno);
108 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 : RepOriginId nodeid, int slotno);
110 : static void error_commit_ts_disabled(void);
111 : static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 : static bool CommitTsPagePrecedes(int page1, int page2);
113 : static void ActivateCommitTs(void);
114 : static void DeactivateCommitTs(void);
115 : static void WriteZeroPageXlogRec(int pageno);
116 : static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 : static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118 : TransactionId *subxids, TimestampTz timestamp,
119 : RepOriginId nodeid);
120 :
121 : /*
122 : * TransactionTreeSetCommitTsData
123 : *
124 : * Record the final commit timestamp of transaction entries in the commit log
125 : * for a transaction and its subtransaction tree, as efficiently as possible.
126 : *
127 : * xid is the top level transaction id.
128 : *
129 : * subxids is an array of xids of length nsubxids, representing subtransactions
130 : * in the tree of xid. In various cases nsubxids may be zero.
131 : * The reason why tracking just the parent xid commit timestamp is not enough
132 : * is that the subtrans SLRU does not stay valid across crashes (it's not
133 : * permanent) so we need to keep the information about them here. If the
134 : * subtrans implementation changes in the future, we might want to revisit the
135 : * decision of storing timestamp info for each subxid.
136 : *
137 : * The write_xlog parameter tells us whether to include an XLog record of this
138 : * or not. Normally, this is called from transaction commit routines (both
139 : * normal and prepared) and the information will be stored in the transaction
140 : * commit XLog record, and so they should pass "false" for this. The XLog redo
141 : * code should use "false" here as well. Other callers probably want to pass
142 : * true, so that the given values persist in case of crashes.
143 : */
144 : void
145 9878 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146 : TransactionId *subxids, TimestampTz timestamp,
147 : RepOriginId nodeid, bool write_xlog)
148 : {
149 : int i;
150 : TransactionId headxid;
151 : TransactionId newestXact;
152 :
153 : /*
154 : * No-op if the module is not active.
155 : *
156 : * An unlocked read here is fine, because in a standby (the only place
157 : * where the flag can change in flight) this routine is only called by the
158 : * recovery process, which is also the only process which can change the
159 : * flag.
160 : */
161 9878 : if (!commitTsShared->commitTsActive)
162 19756 : return;
163 :
164 : /*
165 : * Comply with the WAL-before-data rule: if caller specified it wants this
166 : * value to be recorded in WAL, do so before touching the data.
167 : */
168 0 : if (write_xlog)
169 0 : WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 :
171 : /*
172 : * Figure out the latest Xid in this batch: either the last subxid if
173 : * there's any, otherwise the parent xid.
174 : */
175 0 : if (nsubxids > 0)
176 0 : newestXact = subxids[nsubxids - 1];
177 : else
178 0 : newestXact = xid;
179 :
180 : /*
181 : * We split the xids to set the timestamp to in groups belonging to the
182 : * same SLRU page; the first element in each such set is its head. The
183 : * first group has the main XID as the head; subsequent sets use the first
184 : * subxid not on the previous page as head. This way, we only have to
185 : * lock/modify each SLRU page once.
186 : */
187 0 : for (i = 0, headxid = xid;;)
188 : {
189 0 : int pageno = TransactionIdToCTsPage(headxid);
190 : int j;
191 :
192 0 : for (j = i; j < nsubxids; j++)
193 : {
194 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 0 : break;
196 : }
197 : /* subxids[i..j] are on the same page as the head */
198 :
199 0 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 : pageno);
201 :
202 : /* if we wrote out all subxids, we're done. */
203 0 : if (j + 1 >= nsubxids)
204 0 : break;
205 :
206 : /*
207 : * Set the new head and skip over it, as well as over the subxids we
208 : * just wrote.
209 : */
210 0 : headxid = subxids[j];
211 0 : i += j - i + 1;
212 0 : }
213 :
214 : /* update the cached value in shared memory */
215 0 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 0 : commitTsShared->xidLastCommit = xid;
217 0 : commitTsShared->dataLastCommit.time = timestamp;
218 0 : commitTsShared->dataLastCommit.nodeid = nodeid;
219 :
220 : /* and move forwards our endpoint, if needed */
221 0 : if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222 0 : ShmemVariableCache->newestCommitTsXid = newestXact;
223 0 : LWLockRelease(CommitTsLock);
224 : }
225 :
226 : /*
227 : * Record the commit timestamp of transaction entries in the commit log for all
228 : * entries on a single page. Atomic only on this page.
229 : */
230 : static void
231 0 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 : TransactionId *subxids, TimestampTz ts,
233 : RepOriginId nodeid, int pageno)
234 : {
235 : int slotno;
236 : int i;
237 :
238 0 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239 :
240 0 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 :
242 0 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 0 : for (i = 0; i < nsubxids; i++)
244 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 :
246 0 : CommitTsCtl->shared->page_dirty[slotno] = true;
247 :
248 0 : LWLockRelease(CommitTsControlLock);
249 0 : }
250 :
251 : /*
252 : * Sets the commit timestamp of a single transaction.
253 : *
254 : * Must be called with CommitTsControlLock held
255 : */
256 : static void
257 0 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258 : RepOriginId nodeid, int slotno)
259 : {
260 0 : int entryno = TransactionIdToCTsEntry(xid);
261 : CommitTimestampEntry entry;
262 :
263 0 : Assert(TransactionIdIsNormal(xid));
264 :
265 0 : entry.time = ts;
266 0 : entry.nodeid = nodeid;
267 :
268 0 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269 : SizeOfCommitTimestampEntry * entryno,
270 : &entry, SizeOfCommitTimestampEntry);
271 0 : }
272 :
273 : /*
274 : * Interrogate the commit timestamp of a transaction.
275 : *
276 : * The return value indicates whether a commit timestamp record was found for
277 : * the given xid. The timestamp value is returned in *ts (which may not be
278 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
279 : * null.
280 : */
281 : bool
282 0 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283 : RepOriginId *nodeid)
284 : {
285 0 : int pageno = TransactionIdToCTsPage(xid);
286 0 : int entryno = TransactionIdToCTsEntry(xid);
287 : int slotno;
288 : CommitTimestampEntry entry;
289 : TransactionId oldestCommitTsXid;
290 : TransactionId newestCommitTsXid;
291 :
292 0 : if (!TransactionIdIsValid(xid))
293 0 : ereport(ERROR,
294 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 0 : else if (!TransactionIdIsNormal(xid))
297 : {
298 : /* frozen and bootstrap xids are always committed far in the past */
299 0 : *ts = 0;
300 0 : if (nodeid)
301 0 : *nodeid = 0;
302 0 : return false;
303 : }
304 :
305 0 : LWLockAcquire(CommitTsLock, LW_SHARED);
306 :
307 : /* Error if module not enabled */
308 0 : if (!commitTsShared->commitTsActive)
309 0 : error_commit_ts_disabled();
310 :
311 : /*
312 : * If we're asked for the cached value, return that. Otherwise, fall
313 : * through to read from SLRU.
314 : */
315 0 : if (commitTsShared->xidLastCommit == xid)
316 : {
317 0 : *ts = commitTsShared->dataLastCommit.time;
318 0 : if (nodeid)
319 0 : *nodeid = commitTsShared->dataLastCommit.nodeid;
320 :
321 0 : LWLockRelease(CommitTsLock);
322 0 : return *ts != 0;
323 : }
324 :
325 0 : oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326 0 : newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327 : /* neither is invalid, or both are */
328 0 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329 0 : LWLockRelease(CommitTsLock);
330 :
331 : /*
332 : * Return empty if the requested value is outside our valid range.
333 : */
334 0 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 0 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 0 : TransactionIdPrecedes(newestCommitTsXid, xid))
337 : {
338 0 : *ts = 0;
339 0 : if (nodeid)
340 0 : *nodeid = InvalidRepOriginId;
341 0 : return false;
342 : }
343 :
344 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
345 0 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346 0 : memcpy(&entry,
347 0 : CommitTsCtl->shared->page_buffer[slotno] +
348 : SizeOfCommitTimestampEntry * entryno,
349 : SizeOfCommitTimestampEntry);
350 :
351 0 : *ts = entry.time;
352 0 : if (nodeid)
353 0 : *nodeid = entry.nodeid;
354 :
355 0 : LWLockRelease(CommitTsControlLock);
356 0 : return *ts != 0;
357 : }
358 :
359 : /*
360 : * Return the Xid of the latest committed transaction. (As far as this module
361 : * is concerned, anyway; it's up to the caller to ensure the value is useful
362 : * for its purposes.)
363 : *
364 : * ts and extra are filled with the corresponding data; they can be passed
365 : * as NULL if not wanted.
366 : */
367 : TransactionId
368 0 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369 : {
370 : TransactionId xid;
371 :
372 0 : LWLockAcquire(CommitTsLock, LW_SHARED);
373 :
374 : /* Error if module not enabled */
375 0 : if (!commitTsShared->commitTsActive)
376 0 : error_commit_ts_disabled();
377 :
378 0 : xid = commitTsShared->xidLastCommit;
379 0 : if (ts)
380 0 : *ts = commitTsShared->dataLastCommit.time;
381 0 : if (nodeid)
382 0 : *nodeid = commitTsShared->dataLastCommit.nodeid;
383 0 : LWLockRelease(CommitTsLock);
384 :
385 0 : return xid;
386 : }
387 :
388 : static void
389 0 : error_commit_ts_disabled(void)
390 : {
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 : errmsg("could not get commit timestamp data"),
394 : RecoveryInProgress() ?
395 : errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396 : "track_commit_timestamp") :
397 : errhint("Make sure the configuration parameter \"%s\" is set.",
398 : "track_commit_timestamp")));
399 : }
400 :
401 : /*
402 : * SQL-callable wrapper to obtain commit time of a transaction
403 : */
404 : Datum
405 0 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
406 : {
407 0 : TransactionId xid = PG_GETARG_UINT32(0);
408 : TimestampTz ts;
409 : bool found;
410 :
411 0 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 :
413 0 : if (!found)
414 0 : PG_RETURN_NULL();
415 :
416 0 : PG_RETURN_TIMESTAMPTZ(ts);
417 : }
418 :
419 :
420 : Datum
421 0 : pg_last_committed_xact(PG_FUNCTION_ARGS)
422 : {
423 : TransactionId xid;
424 : TimestampTz ts;
425 : Datum values[2];
426 : bool nulls[2];
427 : TupleDesc tupdesc;
428 : HeapTuple htup;
429 :
430 : /* and construct a tuple with our data */
431 0 : xid = GetLatestCommitTsData(&ts, NULL);
432 :
433 : /*
434 : * Construct a tuple descriptor for the result row. This must match this
435 : * function's pg_proc entry!
436 : */
437 0 : tupdesc = CreateTemplateTupleDesc(2, false);
438 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439 : XIDOID, -1, 0);
440 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441 : TIMESTAMPTZOID, -1, 0);
442 0 : tupdesc = BlessTupleDesc(tupdesc);
443 :
444 0 : if (!TransactionIdIsNormal(xid))
445 : {
446 0 : memset(nulls, true, sizeof(nulls));
447 : }
448 : else
449 : {
450 0 : values[0] = TransactionIdGetDatum(xid);
451 0 : nulls[0] = false;
452 :
453 0 : values[1] = TimestampTzGetDatum(ts);
454 0 : nulls[1] = false;
455 : }
456 :
457 0 : htup = heap_form_tuple(tupdesc, values, nulls);
458 :
459 0 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
460 : }
461 :
462 :
463 : /*
464 : * Number of shared CommitTS buffers.
465 : *
466 : * We use a very similar logic as for the number of CLOG buffers; see comments
467 : * in CLOGShmemBuffers.
468 : */
469 : Size
470 10 : CommitTsShmemBuffers(void)
471 : {
472 10 : return Min(16, Max(4, NBuffers / 1024));
473 : }
474 :
475 : /*
476 : * Shared memory sizing for CommitTs
477 : */
478 : Size
479 5 : CommitTsShmemSize(void)
480 : {
481 5 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482 : sizeof(CommitTimestampShared);
483 : }
484 :
485 : /*
486 : * Initialize CommitTs at system startup (postmaster start or standalone
487 : * backend)
488 : */
489 : void
490 5 : CommitTsShmemInit(void)
491 : {
492 : bool found;
493 :
494 5 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495 5 : SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496 5 : CommitTsControlLock, "pg_commit_ts",
497 : LWTRANCHE_COMMITTS_BUFFERS);
498 :
499 5 : commitTsShared = ShmemInitStruct("CommitTs shared",
500 : sizeof(CommitTimestampShared),
501 : &found);
502 :
503 5 : if (!IsUnderPostmaster)
504 : {
505 5 : Assert(!found);
506 :
507 5 : commitTsShared->xidLastCommit = InvalidTransactionId;
508 5 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509 5 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510 5 : commitTsShared->commitTsActive = false;
511 : }
512 : else
513 0 : Assert(found);
514 5 : }
515 :
516 : /*
517 : * This function must be called ONCE on system install.
518 : *
519 : * (The CommitTs directory is assumed to have been created by initdb, and
520 : * CommitTsShmemInit must have been called already.)
521 : */
522 : void
523 1 : BootStrapCommitTs(void)
524 : {
525 : /*
526 : * Nothing to do here at present, unlike most other SLRU modules; segments
527 : * are created when the server is started with this module enabled. See
528 : * ActivateCommitTs.
529 : */
530 1 : }
531 :
532 : /*
533 : * Initialize (or reinitialize) a page of CommitTs to zeroes.
534 : * If writeXlog is TRUE, also emit an XLOG record saying we did this.
535 : *
536 : * The page is not actually written, just set up in shared memory.
537 : * The slot number of the new page is returned.
538 : *
539 : * Control lock must be held at entry, and will be held at exit.
540 : */
541 : static int
542 0 : ZeroCommitTsPage(int pageno, bool writeXlog)
543 : {
544 : int slotno;
545 :
546 0 : slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547 :
548 0 : if (writeXlog)
549 0 : WriteZeroPageXlogRec(pageno);
550 :
551 0 : return slotno;
552 : }
553 :
554 : /*
555 : * This must be called ONCE during postmaster or standalone-backend startup,
556 : * after StartupXLOG has initialized ShmemVariableCache->nextXid.
557 : */
558 : void
559 0 : StartupCommitTs(void)
560 : {
561 0 : ActivateCommitTs();
562 0 : }
563 :
564 : /*
565 : * This must be called ONCE during postmaster or standalone-backend startup,
566 : * after recovery has finished.
567 : */
568 : void
569 3 : CompleteCommitTsInitialization(void)
570 : {
571 : /*
572 : * If the feature is not enabled, turn it off for good. This also removes
573 : * any leftover data.
574 : *
575 : * Conversely, we activate the module if the feature is enabled. This is
576 : * not necessary in a master system because we already did it earlier, but
577 : * if we're in a standby server that got promoted which had the feature
578 : * enabled and was following a master that had the feature disabled, this
579 : * is where we turn it on locally.
580 : */
581 3 : if (!track_commit_timestamp)
582 3 : DeactivateCommitTs();
583 : else
584 0 : ActivateCommitTs();
585 3 : }
586 :
587 : /*
588 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589 : * XLog record in a standby.
590 : */
591 : void
592 0 : CommitTsParameterChange(bool newvalue, bool oldvalue)
593 : {
594 : /*
595 : * If the commit_ts module is disabled in this server and we get word from
596 : * the master server that it is enabled there, activate it so that we can
597 : * replay future WAL records involving it; also mark it as active on
598 : * pg_control. If the old value was already set, we already did this, so
599 : * don't do anything.
600 : *
601 : * If the module is disabled in the master, disable it here too, unless
602 : * the module is enabled locally.
603 : *
604 : * Note this only runs in the recovery process, so an unlocked read is
605 : * fine.
606 : */
607 0 : if (newvalue)
608 : {
609 0 : if (!commitTsShared->commitTsActive)
610 0 : ActivateCommitTs();
611 : }
612 0 : else if (commitTsShared->commitTsActive)
613 0 : DeactivateCommitTs();
614 0 : }
615 :
616 : /*
617 : * Activate this module whenever necessary.
618 : * This must happen during postmaster or standalone-backend startup,
619 : * or during WAL replay anytime the track_commit_timestamp setting is
620 : * changed in the master.
621 : *
622 : * The reason why this SLRU needs separate activation/deactivation functions is
623 : * that it can be enabled/disabled during start and the activation/deactivation
624 : * on master is propagated to standby via replay. Other SLRUs don't have this
625 : * property and they can be just initialized during normal startup.
626 : *
627 : * This is in charge of creating the currently active segment, if it's not
628 : * already there. The reason for this is that the server might have been
629 : * running with this module disabled for a while and thus might have skipped
630 : * the normal creation point.
631 : */
632 : static void
633 0 : ActivateCommitTs(void)
634 : {
635 : TransactionId xid;
636 : int pageno;
637 :
638 : /* If we've done this already, there's nothing to do */
639 0 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640 0 : if (commitTsShared->commitTsActive)
641 : {
642 0 : LWLockRelease(CommitTsLock);
643 0 : return;
644 : }
645 0 : LWLockRelease(CommitTsLock);
646 :
647 0 : xid = ShmemVariableCache->nextXid;
648 0 : pageno = TransactionIdToCTsPage(xid);
649 :
650 : /*
651 : * Re-Initialize our idea of the latest page number.
652 : */
653 0 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654 0 : CommitTsCtl->shared->latest_page_number = pageno;
655 0 : LWLockRelease(CommitTsControlLock);
656 :
657 : /*
658 : * If CommitTs is enabled, but it wasn't in the previous server run, we
659 : * need to set the oldest and newest values to the next Xid; that way, we
660 : * will not try to read data that might not have been set.
661 : *
662 : * XXX does this have a problem if a server is started with commitTs
663 : * enabled, then started with commitTs disabled, then restarted with it
664 : * enabled again? It doesn't look like it does, because there should be a
665 : * checkpoint that sets the value to InvalidTransactionId at end of
666 : * recovery; and so any chance of injecting new transactions without
667 : * CommitTs values would occur after the oldestCommitTsXid has been set to
668 : * Invalid temporarily.
669 : */
670 0 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
671 0 : if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
672 : {
673 0 : ShmemVariableCache->oldestCommitTsXid =
674 0 : ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
675 : }
676 0 : LWLockRelease(CommitTsLock);
677 :
678 : /* Create the current segment file, if necessary */
679 0 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
680 : {
681 : int slotno;
682 :
683 0 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684 0 : slotno = ZeroCommitTsPage(pageno, false);
685 0 : SimpleLruWritePage(CommitTsCtl, slotno);
686 0 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687 0 : LWLockRelease(CommitTsControlLock);
688 : }
689 :
690 : /* Change the activation status in shared memory. */
691 0 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692 0 : commitTsShared->commitTsActive = true;
693 0 : LWLockRelease(CommitTsLock);
694 : }
695 :
696 : /*
697 : * Deactivate this module.
698 : *
699 : * This must be called when the track_commit_timestamp parameter is turned off.
700 : * This happens during postmaster or standalone-backend startup, or during WAL
701 : * replay.
702 : *
703 : * Resets CommitTs into invalid state to make sure we don't hand back
704 : * possibly-invalid data; also removes segments of old data.
705 : */
706 : static void
707 3 : DeactivateCommitTs(void)
708 : {
709 : /*
710 : * Cleanup the status in the shared memory.
711 : *
712 : * We reset everything in the commitTsShared record to prevent user from
713 : * getting confusing data about last committed transaction on the standby
714 : * when the module was activated repeatedly on the primary.
715 : */
716 3 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
717 :
718 3 : commitTsShared->commitTsActive = false;
719 3 : commitTsShared->xidLastCommit = InvalidTransactionId;
720 3 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721 3 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
722 :
723 3 : ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
724 3 : ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
725 :
726 3 : LWLockRelease(CommitTsLock);
727 :
728 : /*
729 : * Remove *all* files. This is necessary so that there are no leftover
730 : * files; in the case where this feature is later enabled after running
731 : * with it disabled for some time there may be a gap in the file sequence.
732 : * (We can probably tolerate out-of-sequence files, as they are going to
733 : * be overwritten anyway when we wrap around, but it seems better to be
734 : * tidy.)
735 : */
736 3 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
737 3 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
738 3 : LWLockRelease(CommitTsControlLock);
739 3 : }
740 :
741 : /*
742 : * This must be called ONCE during postmaster or standalone-backend shutdown
743 : */
744 : void
745 3 : ShutdownCommitTs(void)
746 : {
747 : /* Flush dirty CommitTs pages to disk */
748 3 : SimpleLruFlush(CommitTsCtl, false);
749 :
750 : /*
751 : * fsync pg_commit_ts to ensure that any files flushed previously are
752 : * durably on disk.
753 : */
754 3 : fsync_fname("pg_commit_ts", true);
755 3 : }
756 :
757 : /*
758 : * Perform a checkpoint --- either during shutdown, or on-the-fly
759 : */
760 : void
761 11 : CheckPointCommitTs(void)
762 : {
763 : /* Flush dirty CommitTs pages to disk */
764 11 : SimpleLruFlush(CommitTsCtl, true);
765 :
766 : /*
767 : * fsync pg_commit_ts to ensure that any files flushed previously are
768 : * durably on disk.
769 : */
770 11 : fsync_fname("pg_commit_ts", true);
771 11 : }
772 :
773 : /*
774 : * Make sure that CommitTs has room for a newly-allocated XID.
775 : *
776 : * NB: this is called while holding XidGenLock. We want it to be very fast
777 : * most of the time; even when it's not so fast, no actual I/O need happen
778 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
779 : * in shared memory.
780 : *
781 : * NB: the current implementation relies on track_commit_timestamp being
782 : * PGC_POSTMASTER.
783 : */
784 : void
785 10625 : ExtendCommitTs(TransactionId newestXact)
786 : {
787 : int pageno;
788 :
789 : /*
790 : * Nothing to do if module not enabled. Note we do an unlocked read of
791 : * the flag here, which is okay because this routine is only called from
792 : * GetNewTransactionId, which is never called in a standby.
793 : */
794 10625 : Assert(!InRecovery);
795 10625 : if (!commitTsShared->commitTsActive)
796 10625 : return;
797 :
798 : /*
799 : * No work except at first XID of a page. But beware: just after
800 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
801 : */
802 0 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
803 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
804 0 : return;
805 :
806 0 : pageno = TransactionIdToCTsPage(newestXact);
807 :
808 0 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
809 :
810 : /* Zero the page and make an XLOG entry about it */
811 0 : ZeroCommitTsPage(pageno, !InRecovery);
812 :
813 0 : LWLockRelease(CommitTsControlLock);
814 : }
815 :
816 : /*
817 : * Remove all CommitTs segments before the one holding the passed
818 : * transaction ID.
819 : *
820 : * Note that we don't need to flush XLOG here.
821 : */
822 : void
823 2 : TruncateCommitTs(TransactionId oldestXact)
824 : {
825 : int cutoffPage;
826 :
827 : /*
828 : * The cutoff point is the start of the segment containing oldestXact. We
829 : * pass the *page* containing oldestXact to SimpleLruTruncate.
830 : */
831 2 : cutoffPage = TransactionIdToCTsPage(oldestXact);
832 :
833 : /* Check to see if there's any files that could be removed */
834 2 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
835 : &cutoffPage))
836 4 : return; /* nothing to remove */
837 :
838 : /* Write XLOG record */
839 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
840 :
841 : /* Now we can remove the old CommitTs segment(s) */
842 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
843 : }
844 :
845 : /*
846 : * Set the limit values between which commit TS can be consulted.
847 : */
848 : void
849 4 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
850 : {
851 : /*
852 : * Be careful not to overwrite values that are either further into the
853 : * "future" or signal a disabled committs.
854 : */
855 4 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
856 4 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
857 : {
858 0 : if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
859 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
860 0 : if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
861 0 : ShmemVariableCache->newestCommitTsXid = newestXact;
862 : }
863 : else
864 : {
865 4 : Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
866 4 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
867 4 : ShmemVariableCache->newestCommitTsXid = newestXact;
868 : }
869 4 : LWLockRelease(CommitTsLock);
870 4 : }
871 :
872 : /*
873 : * Move forwards the oldest commitTS value that can be consulted
874 : */
875 : void
876 2 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
877 : {
878 2 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
879 2 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
880 0 : TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
881 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
882 2 : LWLockRelease(CommitTsLock);
883 2 : }
884 :
885 :
886 : /*
887 : * Decide which of two CLOG page numbers is "older" for truncation purposes.
888 : *
889 : * We need to use comparison of TransactionIds here in order to do the right
890 : * thing with wraparound XID arithmetic. However, if we are asked about
891 : * page number zero, we don't want to hand InvalidTransactionId to
892 : * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
893 : * offset both xids by FirstNormalTransactionId to avoid that.
894 : */
895 : static bool
896 0 : CommitTsPagePrecedes(int page1, int page2)
897 : {
898 : TransactionId xid1;
899 : TransactionId xid2;
900 :
901 0 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
902 0 : xid1 += FirstNormalTransactionId;
903 0 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
904 0 : xid2 += FirstNormalTransactionId;
905 :
906 0 : return TransactionIdPrecedes(xid1, xid2);
907 : }
908 :
909 :
910 : /*
911 : * Write a ZEROPAGE xlog record
912 : */
913 : static void
914 0 : WriteZeroPageXlogRec(int pageno)
915 : {
916 0 : XLogBeginInsert();
917 0 : XLogRegisterData((char *) (&pageno), sizeof(int));
918 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
919 0 : }
920 :
921 : /*
922 : * Write a TRUNCATE xlog record
923 : */
924 : static void
925 0 : WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
926 : {
927 : xl_commit_ts_truncate xlrec;
928 :
929 0 : xlrec.pageno = pageno;
930 0 : xlrec.oldestXid = oldestXid;
931 :
932 0 : XLogBeginInsert();
933 0 : XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
934 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
935 0 : }
936 :
937 : /*
938 : * Write a SETTS xlog record
939 : */
940 : static void
941 0 : WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
942 : TransactionId *subxids, TimestampTz timestamp,
943 : RepOriginId nodeid)
944 : {
945 : xl_commit_ts_set record;
946 :
947 0 : record.timestamp = timestamp;
948 0 : record.nodeid = nodeid;
949 0 : record.mainxid = mainxid;
950 :
951 0 : XLogBeginInsert();
952 0 : XLogRegisterData((char *) &record,
953 : offsetof(xl_commit_ts_set, mainxid) +
954 : sizeof(TransactionId));
955 0 : XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
956 0 : XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
957 0 : }
958 :
959 : /*
960 : * CommitTS resource manager's routines
961 : */
962 : void
963 0 : commit_ts_redo(XLogReaderState *record)
964 : {
965 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
966 :
967 : /* Backup blocks are not used in commit_ts records */
968 0 : Assert(!XLogRecHasAnyBlockRefs(record));
969 :
970 0 : if (info == COMMIT_TS_ZEROPAGE)
971 : {
972 : int pageno;
973 : int slotno;
974 :
975 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(int));
976 :
977 0 : LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
978 :
979 0 : slotno = ZeroCommitTsPage(pageno, false);
980 0 : SimpleLruWritePage(CommitTsCtl, slotno);
981 0 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
982 :
983 0 : LWLockRelease(CommitTsControlLock);
984 : }
985 0 : else if (info == COMMIT_TS_TRUNCATE)
986 : {
987 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
988 :
989 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
990 :
991 : /*
992 : * During XLOG replay, latest_page_number isn't set up yet; insert a
993 : * suitable value to bypass the sanity test in SimpleLruTruncate.
994 : */
995 0 : CommitTsCtl->shared->latest_page_number = trunc->pageno;
996 :
997 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
998 : }
999 0 : else if (info == COMMIT_TS_SETTS)
1000 : {
1001 0 : xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1002 : int nsubxids;
1003 : TransactionId *subxids;
1004 :
1005 0 : nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1006 : sizeof(TransactionId));
1007 0 : if (nsubxids > 0)
1008 : {
1009 0 : subxids = palloc(sizeof(TransactionId) * nsubxids);
1010 0 : memcpy(subxids,
1011 0 : XLogRecGetData(record) + SizeOfCommitTsSet,
1012 : sizeof(TransactionId) * nsubxids);
1013 : }
1014 : else
1015 0 : subxids = NULL;
1016 :
1017 0 : TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1018 0 : setts->timestamp, setts->nodeid, true);
1019 0 : if (subxids)
1020 0 : pfree(subxids);
1021 : }
1022 : else
1023 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1024 0 : }
|