Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinval.c
4 : * POSTGRES shared cache invalidation communication code.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinval.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/xact.h"
18 : #include "commands/async.h"
19 : #include "miscadmin.h"
20 : #include "storage/ipc.h"
21 : #include "storage/proc.h"
22 : #include "storage/sinvaladt.h"
23 : #include "utils/inval.h"
24 :
25 :
26 : uint64 SharedInvalidMessageCounter;
27 :
28 :
29 : /*
30 : * Because backends sitting idle will not be reading sinval events, we
31 : * need a way to give an idle backend a swift kick in the rear and make
32 : * it catch up before the sinval queue overflows and forces it to go
33 : * through a cache reset exercise. This is done by sending
34 : * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
35 : *
36 : * The signal handler will set an interrupt pending flag and will set the
37 : * processes latch. Whenever starting to read from the client, or when
38 : * interrupted while doing so, ProcessClientReadInterrupt() will call
39 : * ProcessCatchupEvent().
40 : */
41 : volatile sig_atomic_t catchupInterruptPending = false;
42 :
43 :
44 : /*
45 : * SendSharedInvalidMessages
46 : * Add shared-cache-invalidation message(s) to the global SI message queue.
47 : */
48 : void
49 43219 : SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
50 : {
51 43219 : SIInsertDataEntries(msgs, n);
52 43219 : }
53 :
54 : /*
55 : * ReceiveSharedInvalidMessages
56 : * Process shared-cache-invalidation messages waiting for this backend
57 : *
58 : * We guarantee to process all messages that had been queued before the
59 : * routine was entered. It is of course possible for more messages to get
60 : * queued right after our last SIGetDataEntries call.
61 : *
62 : * NOTE: it is entirely possible for this routine to be invoked recursively
63 : * as a consequence of processing inside the invalFunction or resetFunction.
64 : * Furthermore, such a recursive call must guarantee that all outstanding
65 : * inval messages have been processed before it exits. This is the reason
66 : * for the strange-looking choice to use a statically allocated buffer array
67 : * and counters; it's so that a recursive call can process messages already
68 : * sucked out of sinvaladt.c.
69 : */
70 : void
71 906822 : ReceiveSharedInvalidMessages(
72 : void (*invalFunction) (SharedInvalidationMessage *msg),
73 : void (*resetFunction) (void))
74 : {
75 : #define MAXINVALMSGS 32
76 : static SharedInvalidationMessage messages[MAXINVALMSGS];
77 :
78 : /*
79 : * We use volatile here to prevent bugs if a compiler doesn't realize that
80 : * recursion is a possibility ...
81 : */
82 : static volatile int nextmsg = 0;
83 : static volatile int nummsgs = 0;
84 :
85 : /* Deal with any messages still pending from an outer recursion */
86 1813644 : while (nextmsg < nummsgs)
87 : {
88 0 : SharedInvalidationMessage msg = messages[nextmsg++];
89 :
90 0 : SharedInvalidMessageCounter++;
91 0 : invalFunction(&msg);
92 : }
93 :
94 : do
95 : {
96 : int getResult;
97 :
98 934366 : nextmsg = nummsgs = 0;
99 :
100 : /* Try to get some more messages */
101 934366 : getResult = SIGetDataEntries(messages, MAXINVALMSGS);
102 :
103 934366 : if (getResult < 0)
104 : {
105 : /* got a reset message */
106 27 : elog(DEBUG4, "cache state reset");
107 27 : SharedInvalidMessageCounter++;
108 27 : resetFunction();
109 27 : break; /* nothing more to do */
110 : }
111 :
112 : /* Process them, being wary that a recursive call might eat some */
113 934339 : nextmsg = 0;
114 934339 : nummsgs = getResult;
115 :
116 2898386 : while (nextmsg < nummsgs)
117 : {
118 1029708 : SharedInvalidationMessage msg = messages[nextmsg++];
119 :
120 1029708 : SharedInvalidMessageCounter++;
121 1029708 : invalFunction(&msg);
122 : }
123 :
124 : /*
125 : * We only need to loop if the last SIGetDataEntries call (which might
126 : * have been within a recursive call) returned a full buffer.
127 : */
128 934339 : } while (nummsgs == MAXINVALMSGS);
129 :
130 : /*
131 : * We are now caught up. If we received a catchup signal, reset that
132 : * flag, and call SICleanupQueue(). This is not so much because we need
133 : * to flush dead messages right now, as that we want to pass on the
134 : * catchup signal to the next slowest backend. "Daisy chaining" the
135 : * catchup signal this way avoids creating spikes in system load for what
136 : * should be just a background maintenance activity.
137 : */
138 906822 : if (catchupInterruptPending)
139 : {
140 213 : catchupInterruptPending = false;
141 213 : elog(DEBUG4, "sinval catchup complete, cleaning queue");
142 213 : SICleanupQueue(false, 0);
143 : }
144 906822 : }
145 :
146 :
147 : /*
148 : * HandleCatchupInterrupt
149 : *
150 : * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
151 : *
152 : * We used to directly call ProcessCatchupEvent directly when idle. These days
153 : * we just set a flag to do it later and notify the process of that fact by
154 : * setting the process's latch.
155 : */
156 : void
157 218 : HandleCatchupInterrupt(void)
158 : {
159 : /*
160 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what
161 : * you do here.
162 : */
163 :
164 218 : catchupInterruptPending = true;
165 :
166 : /* make sure the event is processed in due course */
167 218 : SetLatch(MyLatch);
168 218 : }
169 :
170 : /*
171 : * ProcessCatchupInterrupt
172 : *
173 : * The portion of catchup interrupt handling that runs outside of the signal
174 : * handler, which allows it to actually process pending invalidations.
175 : */
176 : void
177 145 : ProcessCatchupInterrupt(void)
178 : {
179 421 : while (catchupInterruptPending)
180 : {
181 : /*
182 : * What we need to do here is cause ReceiveSharedInvalidMessages() to
183 : * run, which will do the necessary work and also reset the
184 : * catchupInterruptPending flag. If we are inside a transaction we
185 : * can just call AcceptInvalidationMessages() to do this. If we
186 : * aren't, we start and immediately end a transaction; the call to
187 : * AcceptInvalidationMessages() happens down inside transaction start.
188 : *
189 : * It is awfully tempting to just call AcceptInvalidationMessages()
190 : * without the rest of the xact start/stop overhead, and I think that
191 : * would actually work in the normal case; but I am not sure that
192 : * things would clean up nicely if we got an error partway through.
193 : */
194 131 : if (IsTransactionOrTransactionBlock())
195 : {
196 2 : elog(DEBUG4, "ProcessCatchupEvent inside transaction");
197 2 : AcceptInvalidationMessages();
198 : }
199 : else
200 : {
201 129 : elog(DEBUG4, "ProcessCatchupEvent outside transaction");
202 129 : StartTransactionCommand();
203 129 : CommitTransactionCommand();
204 : }
205 : }
206 145 : }
|