Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinvaladt.c
4 : * POSTGRES shared cache invalidation data manager.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinvaladt.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <signal.h>
18 : #include <unistd.h>
19 :
20 : #include "miscadmin.h"
21 : #include "storage/backendid.h"
22 : #include "storage/ipc.h"
23 : #include "storage/proc.h"
24 : #include "storage/procsignal.h"
25 : #include "storage/shmem.h"
26 : #include "storage/sinvaladt.h"
27 : #include "storage/spin.h"
28 : #include "access/transam.h"
29 :
30 :
31 : /*
32 : * Conceptually, the shared cache invalidation messages are stored in an
33 : * infinite array, where maxMsgNum is the next array subscript to store a
34 : * submitted message in, minMsgNum is the smallest array subscript containing
35 : * a message not yet read by all backends, and we always have maxMsgNum >=
36 : * minMsgNum. (They are equal when there are no messages pending.) For each
37 : * active backend, there is a nextMsgNum pointer indicating the next message it
38 : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
39 : * backend.
40 : *
41 : * (In the current implementation, minMsgNum is a lower bound for the
42 : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
43 : * smallest nextMsgNum --- it may lag behind. We only update it when
44 : * SICleanupQueue is called, and we try not to do that often.)
45 : *
46 : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
47 : * entries. We translate MsgNum values into circular-buffer indexes by
48 : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
49 : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
50 : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
51 : * in the buffer. If the buffer does overflow, we recover by setting the
52 : * "reset" flag for each backend that has fallen too far behind. A backend
53 : * that is in "reset" state is ignored while determining minMsgNum. When
54 : * it does finally attempt to receive inval messages, it must discard all
55 : * its invalidatable state, since it won't know what it missed.
56 : *
57 : * To reduce the probability of needing resets, we send a "catchup" interrupt
58 : * to any backend that seems to be falling unreasonably far behind. The
59 : * normal behavior is that at most one such interrupt is in flight at a time;
60 : * when a backend completes processing a catchup interrupt, it executes
61 : * SICleanupQueue, which will signal the next-furthest-behind backend if
62 : * needed. This avoids undue contention from multiple backends all trying
63 : * to catch up at once. However, the furthest-back backend might be stuck
64 : * in a state where it can't catch up. Eventually it will get reset, so it
65 : * won't cause any more problems for anyone but itself. But we don't want
66 : * to find that a bunch of other backends are now too close to the reset
67 : * threshold to be saved. So SICleanupQueue is designed to occasionally
68 : * send extra catchup interrupts as the queue gets fuller, to backends that
69 : * are far behind and haven't gotten one yet. As long as there aren't a lot
70 : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
71 : * that aren't stuck will propagate their interrupts to the next guy.
72 : *
73 : * We would have problems if the MsgNum values overflow an integer, so
74 : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
75 : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
76 : * large so that we don't need to do this often. It must be a multiple of
77 : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
78 : * to be moved when we do it.
79 : *
80 : * Access to the shared sinval array is protected by two locks, SInvalReadLock
81 : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
82 : * authorizes them to modify their own ProcState but not to modify or even
83 : * look at anyone else's. When we need to perform array-wide updates,
84 : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
85 : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
86 : * mode) to serialize adding messages to the queue. Note that a writer
87 : * can operate in parallel with one or more readers, because the writer
88 : * has no need to touch anyone's ProcState, except in the infrequent cases
89 : * when SICleanupQueue is needed. The only point of overlap is that
90 : * the writer wants to change maxMsgNum while readers need to read it.
91 : * We deal with that by having a spinlock that readers must take for just
92 : * long enough to read maxMsgNum, while writers take it for just long enough
93 : * to write maxMsgNum. (The exact rule is that you need the spinlock to
94 : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
95 : * spinlock to write maxMsgNum unless you are holding both locks.)
96 : *
97 : * Note: since maxMsgNum is an int and hence presumably atomically readable/
98 : * writable, the spinlock might seem unnecessary. The reason it is needed
99 : * is to provide a memory barrier: we need to be sure that messages written
100 : * to the array are actually there before maxMsgNum is increased, and that
101 : * readers will see that data after fetching maxMsgNum. Multiprocessors
102 : * that have weak memory-ordering guarantees can fail without the memory
103 : * barrier instructions that are included in the spinlock sequences.
104 : */
105 :
106 :
107 : /*
108 : * Configurable parameters.
109 : *
110 : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
111 : * Must be a power of 2 for speed.
112 : *
113 : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
114 : * Must be a multiple of MAXNUMMESSAGES. Should be large.
115 : *
116 : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
117 : * before we bother to call SICleanupQueue.
118 : *
119 : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
120 : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
121 : *
122 : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
123 : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
124 : *
125 : * WRITE_QUANTUM: the max number of messages to push into the buffer per
126 : * iteration of SIInsertDataEntries. Noncritical but should be less than
127 : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
128 : * per iteration.
129 : */
130 :
131 : #define MAXNUMMESSAGES 4096
132 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
133 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
134 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
135 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
136 : #define WRITE_QUANTUM 64
137 :
138 : /* Per-backend state in shared invalidation structure */
139 : typedef struct ProcState
140 : {
141 : /* procPid is zero in an inactive ProcState array entry. */
142 : pid_t procPid; /* PID of backend, for signaling */
143 : PGPROC *proc; /* PGPROC of backend */
144 : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
145 : int nextMsgNum; /* next message number to read */
146 : bool resetState; /* backend needs to reset its state */
147 : bool signaled; /* backend has been sent catchup signal */
148 : bool hasMessages; /* backend has unread messages */
149 :
150 : /*
151 : * Backend only sends invalidations, never receives them. This only makes
152 : * sense for Startup process during recovery because it doesn't maintain a
153 : * relcache, yet it fires inval messages to allow query backends to see
154 : * schema changes.
155 : */
156 : bool sendOnly; /* backend only sends, never receives */
157 :
158 : /*
159 : * Next LocalTransactionId to use for each idle backend slot. We keep
160 : * this here because it is indexed by BackendId and it is convenient to
161 : * copy the value to and from local memory when MyBackendId is set. It's
162 : * meaningless in an active ProcState entry.
163 : */
164 : LocalTransactionId nextLXID;
165 : } ProcState;
166 :
167 : /* Shared cache invalidation memory segment */
168 : typedef struct SISeg
169 : {
170 : /*
171 : * General state information
172 : */
173 : int minMsgNum; /* oldest message still needed */
174 : int maxMsgNum; /* next message number to be assigned */
175 : int nextThreshold; /* # of messages to call SICleanupQueue */
176 : int lastBackend; /* index of last active procState entry, +1 */
177 : int maxBackends; /* size of procState array */
178 :
179 : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
180 :
181 : /*
182 : * Circular buffer holding shared-inval messages
183 : */
184 : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
185 :
186 : /*
187 : * Per-backend invalidation state info (has MaxBackends entries).
188 : */
189 : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
190 : } SISeg;
191 :
192 : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
193 :
194 :
195 : static LocalTransactionId nextLocalTransactionId;
196 :
197 : static void CleanupInvalidationState(int status, Datum arg);
198 :
199 :
200 : /*
201 : * SInvalShmemSize --- return shared-memory space needed
202 : */
203 : Size
204 10 : SInvalShmemSize(void)
205 : {
206 : Size size;
207 :
208 10 : size = offsetof(SISeg, procState);
209 10 : size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210 :
211 10 : return size;
212 : }
213 :
214 : /*
215 : * CreateSharedInvalidationState
216 : * Create and initialize the SI message buffer
217 : */
218 : void
219 5 : CreateSharedInvalidationState(void)
220 : {
221 : int i;
222 : bool found;
223 :
224 : /* Allocate space in shared memory */
225 5 : shmInvalBuffer = (SISeg *)
226 5 : ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227 5 : if (found)
228 5 : return;
229 :
230 : /* Clear message counters, save size of procState array, init spinlock */
231 5 : shmInvalBuffer->minMsgNum = 0;
232 5 : shmInvalBuffer->maxMsgNum = 0;
233 5 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
234 5 : shmInvalBuffer->lastBackend = 0;
235 5 : shmInvalBuffer->maxBackends = MaxBackends;
236 5 : SpinLockInit(&shmInvalBuffer->msgnumLock);
237 :
238 : /* The buffer[] array is initially all unused, so we need not fill it */
239 :
240 : /* Mark all backends inactive, and initialize nextLXID */
241 565 : for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242 : {
243 560 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
244 560 : shmInvalBuffer->procState[i].proc = NULL;
245 560 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
246 560 : shmInvalBuffer->procState[i].resetState = false;
247 560 : shmInvalBuffer->procState[i].signaled = false;
248 560 : shmInvalBuffer->procState[i].hasMessages = false;
249 560 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
250 : }
251 : }
252 :
253 : /*
254 : * SharedInvalBackendInit
255 : * Initialize a new backend to operate on the sinval buffer
256 : */
257 : void
258 338 : SharedInvalBackendInit(bool sendOnly)
259 : {
260 : int index;
261 338 : ProcState *stateP = NULL;
262 338 : SISeg *segP = shmInvalBuffer;
263 :
264 : /*
265 : * This can run in parallel with read operations, but not with write
266 : * operations, since SIInsertDataEntries relies on lastBackend to set
267 : * hasMessages appropriately.
268 : */
269 338 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270 :
271 : /* Look for a free entry in the procState array */
272 1871 : for (index = 0; index < segP->lastBackend; index++)
273 : {
274 1600 : if (segP->procState[index].procPid == 0) /* inactive slot? */
275 : {
276 67 : stateP = &segP->procState[index];
277 67 : break;
278 : }
279 : }
280 :
281 338 : if (stateP == NULL)
282 : {
283 271 : if (segP->lastBackend < segP->maxBackends)
284 : {
285 271 : stateP = &segP->procState[segP->lastBackend];
286 271 : Assert(stateP->procPid == 0);
287 271 : segP->lastBackend++;
288 : }
289 : else
290 : {
291 : /*
292 : * out of procState slots: MaxBackends exceeded -- report normally
293 : */
294 0 : MyBackendId = InvalidBackendId;
295 0 : LWLockRelease(SInvalWriteLock);
296 0 : ereport(FATAL,
297 : (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298 : errmsg("sorry, too many clients already")));
299 : }
300 : }
301 :
302 338 : MyBackendId = (stateP - &segP->procState[0]) + 1;
303 :
304 : /* Advertise assigned backend ID in MyProc */
305 338 : MyProc->backendId = MyBackendId;
306 :
307 : /* Fetch next local transaction ID into local memory */
308 338 : nextLocalTransactionId = stateP->nextLXID;
309 :
310 : /* mark myself active, with all extant messages already read */
311 338 : stateP->procPid = MyProcPid;
312 338 : stateP->proc = MyProc;
313 338 : stateP->nextMsgNum = segP->maxMsgNum;
314 338 : stateP->resetState = false;
315 338 : stateP->signaled = false;
316 338 : stateP->hasMessages = false;
317 338 : stateP->sendOnly = sendOnly;
318 :
319 338 : LWLockRelease(SInvalWriteLock);
320 :
321 : /* register exit routine to mark my entry inactive at exit */
322 338 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
323 :
324 338 : elog(DEBUG4, "my backend ID is %d", MyBackendId);
325 338 : }
326 :
327 : /*
328 : * CleanupInvalidationState
329 : * Mark the current backend as no longer active.
330 : *
331 : * This function is called via on_shmem_exit() during backend shutdown.
332 : *
333 : * arg is really of type "SISeg*".
334 : */
335 : static void
336 338 : CleanupInvalidationState(int status, Datum arg)
337 : {
338 338 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
339 : ProcState *stateP;
340 : int i;
341 :
342 338 : Assert(PointerIsValid(segP));
343 :
344 338 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
345 :
346 338 : stateP = &segP->procState[MyBackendId - 1];
347 :
348 : /* Update next local transaction ID for next holder of this backendID */
349 338 : stateP->nextLXID = nextLocalTransactionId;
350 :
351 : /* Mark myself inactive */
352 338 : stateP->procPid = 0;
353 338 : stateP->proc = NULL;
354 338 : stateP->nextMsgNum = 0;
355 338 : stateP->resetState = false;
356 338 : stateP->signaled = false;
357 :
358 : /* Recompute index of last active backend */
359 609 : for (i = segP->lastBackend; i > 0; i--)
360 : {
361 606 : if (segP->procState[i - 1].procPid != 0)
362 335 : break;
363 : }
364 338 : segP->lastBackend = i;
365 :
366 338 : LWLockRelease(SInvalWriteLock);
367 338 : }
368 :
369 : /*
370 : * BackendIdGetProc
371 : * Get the PGPROC structure for a backend, given the backend ID.
372 : * The result may be out of date arbitrarily quickly, so the caller
373 : * must be careful about how this information is used. NULL is
374 : * returned if the backend is not active.
375 : */
376 : PGPROC *
377 0 : BackendIdGetProc(int backendID)
378 : {
379 0 : PGPROC *result = NULL;
380 0 : SISeg *segP = shmInvalBuffer;
381 :
382 : /* Need to lock out additions/removals of backends */
383 0 : LWLockAcquire(SInvalWriteLock, LW_SHARED);
384 :
385 0 : if (backendID > 0 && backendID <= segP->lastBackend)
386 : {
387 0 : ProcState *stateP = &segP->procState[backendID - 1];
388 :
389 0 : result = stateP->proc;
390 : }
391 :
392 0 : LWLockRelease(SInvalWriteLock);
393 :
394 0 : return result;
395 : }
396 :
397 : /*
398 : * BackendIdGetTransactionIds
399 : * Get the xid and xmin of the backend. The result may be out of date
400 : * arbitrarily quickly, so the caller must be careful about how this
401 : * information is used.
402 : */
403 : void
404 0 : BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
405 : {
406 0 : SISeg *segP = shmInvalBuffer;
407 :
408 0 : *xid = InvalidTransactionId;
409 0 : *xmin = InvalidTransactionId;
410 :
411 : /* Need to lock out additions/removals of backends */
412 0 : LWLockAcquire(SInvalWriteLock, LW_SHARED);
413 :
414 0 : if (backendID > 0 && backendID <= segP->lastBackend)
415 : {
416 0 : ProcState *stateP = &segP->procState[backendID - 1];
417 0 : PGPROC *proc = stateP->proc;
418 :
419 0 : if (proc != NULL)
420 : {
421 0 : PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422 :
423 0 : *xid = xact->xid;
424 0 : *xmin = xact->xmin;
425 : }
426 : }
427 :
428 0 : LWLockRelease(SInvalWriteLock);
429 0 : }
430 :
431 : /*
432 : * SIInsertDataEntries
433 : * Add new invalidation message(s) to the buffer.
434 : */
435 : void
436 43219 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
437 : {
438 43219 : SISeg *segP = shmInvalBuffer;
439 :
440 : /*
441 : * N can be arbitrarily large. We divide the work into groups of no more
442 : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443 : * an unreasonably long time. (This is not so much because we care about
444 : * letting in other writers, as that some just-caught-up backend might be
445 : * trying to do SICleanupQueue to pass on its signal, and we don't want it
446 : * to have to wait a long time.) Also, we need to consider calling
447 : * SICleanupQueue every so often.
448 : */
449 129655 : while (n > 0)
450 : {
451 43217 : int nthistime = Min(n, WRITE_QUANTUM);
452 : int numMsgs;
453 : int max;
454 : int i;
455 :
456 43217 : n -= nthistime;
457 :
458 43217 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459 :
460 : /*
461 : * If the buffer is full, we *must* acquire some space. Clean the
462 : * queue and reset anyone who is preventing space from being freed.
463 : * Otherwise, clean the queue only when it's exceeded the next
464 : * fullness threshold. We have to loop and recheck the buffer state
465 : * after any call of SICleanupQueue.
466 : */
467 : for (;;)
468 : {
469 43581 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
470 87131 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
471 43550 : numMsgs >= segP->nextThreshold)
472 364 : SICleanupQueue(true, nthistime);
473 : else
474 : break;
475 364 : }
476 :
477 : /*
478 : * Insert new message(s) into proper slot of circular buffer
479 : */
480 43217 : max = segP->maxMsgNum;
481 265187 : while (nthistime-- > 0)
482 : {
483 178753 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
484 178753 : max++;
485 : }
486 :
487 : /* Update current value of maxMsgNum using spinlock */
488 43217 : SpinLockAcquire(&segP->msgnumLock);
489 43217 : segP->maxMsgNum = max;
490 43217 : SpinLockRelease(&segP->msgnumLock);
491 :
492 : /*
493 : * Now that the maxMsgNum change is globally visible, we give everyone
494 : * a swift kick to make sure they read the newly added messages.
495 : * Releasing SInvalWriteLock will enforce a full memory barrier, so
496 : * these (unlocked) changes will be committed to memory before we exit
497 : * the function.
498 : */
499 507604 : for (i = 0; i < segP->lastBackend; i++)
500 : {
501 464387 : ProcState *stateP = &segP->procState[i];
502 :
503 464387 : stateP->hasMessages = true;
504 : }
505 :
506 43217 : LWLockRelease(SInvalWriteLock);
507 : }
508 43219 : }
509 :
510 : /*
511 : * SIGetDataEntries
512 : * get next SI message(s) for current backend, if there are any
513 : *
514 : * Possible return values:
515 : * 0: no SI message available
516 : * n>0: next n SI messages have been extracted into data[]
517 : * -1: SI reset message extracted
518 : *
519 : * If the return value is less than the array size "datasize", the caller
520 : * can assume that there are no more SI messages after the one(s) returned.
521 : * Otherwise, another call is needed to collect more messages.
522 : *
523 : * NB: this can run in parallel with other instances of SIGetDataEntries
524 : * executing on behalf of other backends, since each instance will modify only
525 : * fields of its own backend's ProcState, and no instance will look at fields
526 : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
527 : * in shared mode. Note that this is not exactly the normal (read-only)
528 : * interpretation of a shared lock! Look closely at the interactions before
529 : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
530 : *
531 : * NB: this can also run in parallel with SIInsertDataEntries. It is not
532 : * guaranteed that we will return any messages added after the routine is
533 : * entered.
534 : *
535 : * Note: we assume that "datasize" is not so large that it might be important
536 : * to break our hold on SInvalReadLock into segments.
537 : */
538 : int
539 934366 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
540 : {
541 : SISeg *segP;
542 : ProcState *stateP;
543 : int max;
544 : int n;
545 :
546 934366 : segP = shmInvalBuffer;
547 934366 : stateP = &segP->procState[MyBackendId - 1];
548 :
549 : /*
550 : * Before starting to take locks, do a quick, unlocked test to see whether
551 : * there can possibly be anything to read. On a multiprocessor system,
552 : * it's possible that this load could migrate backwards and occur before
553 : * we actually enter this function, so we might miss a sinval message that
554 : * was just added by some other processor. But they can't migrate
555 : * backwards over a preceding lock acquisition, so it should be OK. If we
556 : * haven't acquired a lock preventing against further relevant
557 : * invalidations, any such occurrence is not much different than if the
558 : * invalidation had arrived slightly later in the first place.
559 : */
560 934366 : if (!stateP->hasMessages)
561 892578 : return 0;
562 :
563 41788 : LWLockAcquire(SInvalReadLock, LW_SHARED);
564 :
565 : /*
566 : * We must reset hasMessages before determining how many messages we're
567 : * going to read. That way, if new messages arrive after we have
568 : * determined how many we're reading, the flag will get reset and we'll
569 : * notice those messages part-way through.
570 : *
571 : * Note that, if we don't end up reading all of the messages, we had
572 : * better be certain to reset this flag before exiting!
573 : */
574 41788 : stateP->hasMessages = false;
575 :
576 : /* Fetch current value of maxMsgNum using spinlock */
577 41788 : SpinLockAcquire(&segP->msgnumLock);
578 41788 : max = segP->maxMsgNum;
579 41788 : SpinLockRelease(&segP->msgnumLock);
580 :
581 41788 : if (stateP->resetState)
582 : {
583 : /*
584 : * Force reset. We can say we have dealt with any messages added
585 : * since the reset, as well; and that means we should clear the
586 : * signaled flag, too.
587 : */
588 27 : stateP->nextMsgNum = max;
589 27 : stateP->resetState = false;
590 27 : stateP->signaled = false;
591 27 : LWLockRelease(SInvalReadLock);
592 27 : return -1;
593 : }
594 :
595 : /*
596 : * Retrieve messages and advance backend's counter, until data array is
597 : * full or there are no more messages.
598 : *
599 : * There may be other backends that haven't read the message(s), so we
600 : * cannot delete them here. SICleanupQueue() will eventually remove them
601 : * from the queue.
602 : */
603 41761 : n = 0;
604 1113230 : while (n < datasize && stateP->nextMsgNum < max)
605 : {
606 1029708 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607 1029708 : stateP->nextMsgNum++;
608 : }
609 :
610 : /*
611 : * If we have caught up completely, reset our "signaled" flag so that
612 : * we'll get another signal if we fall behind again.
613 : *
614 : * If we haven't caught up completely, reset the hasMessages flag so that
615 : * we see the remaining messages next time.
616 : */
617 41761 : if (stateP->nextMsgNum >= max)
618 14353 : stateP->signaled = false;
619 : else
620 27408 : stateP->hasMessages = true;
621 :
622 41761 : LWLockRelease(SInvalReadLock);
623 41761 : return n;
624 : }
625 :
626 : /*
627 : * SICleanupQueue
628 : * Remove messages that have been consumed by all active backends
629 : *
630 : * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
631 : * minFree is the minimum number of message slots to make free.
632 : *
633 : * Possible side effects of this routine include marking one or more
634 : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
635 : * to some backend that seems to be getting too far behind. We signal at
636 : * most one backend at a time, for reasons explained at the top of the file.
637 : *
638 : * Caution: because we transiently release write lock when we have to signal
639 : * some other backend, it is NOT guaranteed that there are still minFree
640 : * free message slots at exit. Caller must recheck and perhaps retry.
641 : */
642 : void
643 577 : SICleanupQueue(bool callerHasWriteLock, int minFree)
644 : {
645 577 : SISeg *segP = shmInvalBuffer;
646 : int min,
647 : minsig,
648 : lowbound,
649 : numMsgs,
650 : i;
651 577 : ProcState *needSig = NULL;
652 :
653 : /* Lock out all writers and readers */
654 577 : if (!callerHasWriteLock)
655 213 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656 577 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657 :
658 : /*
659 : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660 : * furthest-back backend that needs signaling (if any), and reset any
661 : * backends that are too far back. Note that because we ignore sendOnly
662 : * backends here it is possible for them to keep sending messages without
663 : * a problem even when they are the only active backend.
664 : */
665 577 : min = segP->maxMsgNum;
666 577 : minsig = min - SIG_THRESHOLD;
667 577 : lowbound = min - MAXNUMMESSAGES + minFree;
668 :
669 7803 : for (i = 0; i < segP->lastBackend; i++)
670 : {
671 7226 : ProcState *stateP = &segP->procState[i];
672 7226 : int n = stateP->nextMsgNum;
673 :
674 : /* Ignore if inactive or already in reset state */
675 7226 : if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676 2545 : continue;
677 :
678 : /*
679 : * If we must free some space and this backend is preventing it, force
680 : * him into reset state and then ignore until he catches up.
681 : */
682 4681 : if (n < lowbound)
683 : {
684 31 : stateP->resetState = true;
685 : /* no point in signaling him ... */
686 31 : continue;
687 : }
688 :
689 : /* Track the global minimum nextMsgNum */
690 4650 : if (n < min)
691 1123 : min = n;
692 :
693 : /* Also see who's furthest back of the unsignaled backends */
694 4650 : if (n < minsig && !stateP->signaled)
695 : {
696 226 : minsig = n;
697 226 : needSig = stateP;
698 : }
699 : }
700 577 : segP->minMsgNum = min;
701 :
702 : /*
703 : * When minMsgNum gets really large, decrement all message counters so as
704 : * to forestall overflow of the counters. This happens seldom enough that
705 : * folding it into the previous loop would be a loser.
706 : */
707 577 : if (min >= MSGNUMWRAPAROUND)
708 : {
709 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
710 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
711 0 : for (i = 0; i < segP->lastBackend; i++)
712 : {
713 : /* we don't bother skipping inactive entries here */
714 0 : segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
715 : }
716 : }
717 :
718 : /*
719 : * Determine how many messages are still in the queue, and set the
720 : * threshold at which we should repeat SICleanupQueue().
721 : */
722 577 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
723 577 : if (numMsgs < CLEANUP_MIN)
724 148 : segP->nextThreshold = CLEANUP_MIN;
725 : else
726 429 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727 :
728 : /*
729 : * Lastly, signal anyone who needs a catchup interrupt. Since
730 : * SendProcSignal() might not be fast, we don't want to hold locks while
731 : * executing it.
732 : */
733 577 : if (needSig)
734 : {
735 218 : pid_t his_pid = needSig->procPid;
736 218 : BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
737 :
738 218 : needSig->signaled = true;
739 218 : LWLockRelease(SInvalReadLock);
740 218 : LWLockRelease(SInvalWriteLock);
741 218 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742 218 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743 218 : if (callerHasWriteLock)
744 167 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745 : }
746 : else
747 : {
748 359 : LWLockRelease(SInvalReadLock);
749 359 : if (!callerHasWriteLock)
750 162 : LWLockRelease(SInvalWriteLock);
751 : }
752 577 : }
753 :
754 :
755 : /*
756 : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
757 : *
758 : * We split VirtualTransactionIds into two parts so that it is possible
759 : * to allocate a new one without any contention for shared memory, except
760 : * for a bit of additional overhead during backend startup/shutdown.
761 : * The high-order part of a VirtualTransactionId is a BackendId, and the
762 : * low-order part is a LocalTransactionId, which we assign from a local
763 : * counter. To avoid the risk of a VirtualTransactionId being reused
764 : * within a short interval, successive procs occupying the same backend ID
765 : * slot should use a consecutive sequence of local IDs, which is implemented
766 : * by copying nextLocalTransactionId as seen above.
767 : */
768 : LocalTransactionId
769 26188 : GetNextLocalTransactionId(void)
770 : {
771 : LocalTransactionId result;
772 :
773 : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
774 : do
775 : {
776 26188 : result = nextLocalTransactionId++;
777 26188 : } while (!LocalTransactionIdIsValid(result));
778 :
779 26167 : return result;
780 : }
|