]> granicus.if.org Git - postgresql/blob - src/backend/port/unix_latch.c
Introduce latches. A latch is a boolean variable, with the capability to
[postgresql] / src / backend / port / unix_latch.c
1 /*-------------------------------------------------------------------------
2  *
3  * unix_latch.c
4  *        Routines for inter-process latches
5  *
6  * A latch is a boolean variable, with operations that let you to sleep
7  * until it is set. A latch can be set from another process, or a signal
8  * handler within the same process.
9  *
10  * The latch interface is a reliable replacement for the common pattern of
11  * using pg_usleep() or select() to wait until a signal arrives, where the
12  * signal handler sets a global variable. Because on some platforms, an
13  * incoming signal doesn't interrupt sleep, and even on platforms where it
14  * does there is a race condition if the signal arrives just before
15  * entering the sleep, the common pattern must periodically wake up and
16  * poll the global variable. pselect() system call was invented to solve
17  * the problem, but it is not portable enough. Latches are designed to
18  * overcome these limitations, allowing you to sleep without polling and
19  * ensuring a quick response to signals from other processes.
20  *
21  * There are two kinds of latches: local and shared. A local latch is
22  * initialized by InitLatch, and can only be set from the same process.
23  * A local latch can be used to wait for a signal to arrive, by calling
24  * SetLatch in the signal handler. A shared latch resides in shared memory,
25  * and must be initialized at postmaster startup by InitSharedLatch. Before
26  * a shared latch can be waited on, it must be associated with a process
27  * with OwnLatch. Only the process owning the latch can wait on it, but any
28  * process can set it.
29  *
30  * There are three basic operations on a latch:
31  *
32  * SetLatch             - Sets the latch
33  * ResetLatch   - Clears the latch, allowing it to be set again
34  * WaitLatch    - Waits for the latch to become set
35  *
36  * The correct pattern to wait for an event is:
37  *
38  * for (;;)
39  * {
40  *     ResetLatch();
41  *     if (work to do)
42  *         Do Stuff();
43  *
44  *     WaitLatch();
45  * }
46  *
47  * It's important to reset the latch *before* checking if there's work to
48  * do. Otherwise, if someone sets the latch between the check and the
49  * ResetLatch call, you will miss it and Wait will block.
50  *
51  * To wake up the waiter, you must first set a global flag or something
52  * else that the main loop tests in the "if (work to do)" part, and call
53  * SetLatch *after* that. SetLatch is designed to return quickly if the
54  * latch is already set.
55  *
56  *
57  * Implementation
58  * --------------
59  *
60  * The Unix implementation uses the so-called self-pipe trick to overcome
61  * the race condition involved with select() and setting a global flag
62  * in the signal handler. When a latch is set and the current process
63  * is waiting for it, the signal handler wakes up the select() in
64  * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
65  * interrupt select() on all platforms, and even on platforms where it
66  * does, a signal that arrives just before the select() call does not
67  * prevent the select() from entering sleep. An incoming byte on a pipe
68  * however reliably interrupts the sleep, and makes select() to return
69  * immediately if the signal arrives just before select() begins.
70  *
71  * When SetLatch is called from the same process that owns the latch,
72  * SetLatch writes the byte directly to the pipe. If it's owned by another
73  * process, SIGUSR1 is sent and the signal handler in the waiting process
74  * writes the byte to the pipe on behalf of the signaling process.
75  *
76  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
77  * Portions Copyright (c) 1994, Regents of the University of California
78  *
79  * IDENTIFICATION
80  *        $PostgreSQL: pgsql/src/backend/port/unix_latch.c,v 1.1 2010/09/11 15:48:04 heikki Exp $
81  *
82  *-------------------------------------------------------------------------
83  */
84 #include "postgres.h"
85
86 #include <fcntl.h>
87 #include <signal.h>
88 #include <unistd.h>
89
90 #include "miscadmin.h"
91 #include "storage/latch.h"
92 #include "storage/shmem.h"
93
94 /* Are we currently in WaitLatch? The signal handler would like to know. */
95 static volatile sig_atomic_t waiting = false;
96
97 /* Read and write end of the self-pipe */
98 static int selfpipe_readfd = -1;
99 static int selfpipe_writefd = -1;
100
101 /* private function prototypes */
102 static void initSelfPipe(void);
103 static void drainSelfPipe(void);
104 static void sendSelfPipeByte(void);
105
106
107 /*
108  * Initialize a backend-local latch.
109  */
110 void
111 InitLatch(volatile Latch *latch)
112 {
113         /* Initialize the self pipe if this is our first latch in the process */
114         if (selfpipe_readfd == -1)
115                 initSelfPipe();
116
117         latch->is_set = false;
118         latch->owner_pid = MyProcPid;
119         latch->is_shared = false;
120 }
121
122 /*
123  * Initialize a shared latch that can be set from other processes. The latch
124  * is initially owned by no-one, use OwnLatch to associate it with the
125  * current process.
126  *
127  * NB: When you introduce a new shared latch, you must increase the shared
128  * latch count in NumSharedLatches in win32_latch.c!
129  */
130 void
131 InitSharedLatch(volatile Latch *latch)
132 {
133         latch->is_set = false;
134         latch->owner_pid = 0;
135         latch->is_shared = true;
136 }
137
138 /*
139  * Associate a shared latch with the current process, allowing it to
140  * wait on it.
141  *
142  * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal
143  * handler, as shared latches use SIGUSR1 to for inter-process communication.
144  */
145 void
146 OwnLatch(volatile Latch *latch)
147 {
148         Assert(latch->is_shared);
149
150         /* Initialize the self pipe if this is our first latch in the process */
151         if (selfpipe_readfd == -1)
152                 initSelfPipe();
153
154         if (latch->owner_pid != 0)
155                 elog(ERROR, "latch already owned");
156         latch->owner_pid = MyProcPid;
157 }
158
159 /*
160  * Disown a shared latch currently owned by the current process.
161  */
162 void
163 DisownLatch(volatile Latch *latch)
164 {
165         Assert(latch->is_shared);
166         Assert(latch->owner_pid == MyProcPid);
167         latch->owner_pid = 0;
168 }
169
170 /*
171  * Wait for given latch to be set or until timeout is exceeded.
172  * If the latch is already set, the function returns immediately.
173  *
174  * The 'timeout' is given in microseconds, and -1 means wait forever.
175  * On some platforms, signals cause the timeout to be restarted, so beware
176  * that the function can sleep for several times longer than the specified
177  * timeout.
178  *
179  * The latch must be owned by the current process, ie. it must be a
180  * backend-local latch initialized with InitLatch, or a shared latch
181  * associated with the current process by calling OwnLatch.
182  *
183  * Returns 'true' if the latch was set, or 'false' if timeout was reached.
184  */
185 bool
186 WaitLatch(volatile Latch *latch, long timeout)
187 {
188         return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
189 }
190
191 /*
192  * Like WaitLatch, but will also return when there's data available in
193  * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
194  * was set, or 2 if the scoket became readable.
195  */
196 int
197 WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
198 {
199         struct timeval tv, *tvp = NULL;
200         fd_set          input_mask;
201         int                     rc;
202         int                     result = 0;
203
204         if (latch->owner_pid != MyProcPid)
205                 elog(ERROR, "cannot wait on a latch owned by another process");
206
207         /* Initialize timeout */
208         if (timeout >= 0)
209         {
210                 tv.tv_sec = timeout / 1000000L;
211                 tv.tv_usec = timeout % 1000000L;
212                 tvp = &tv;
213         }
214
215         waiting = true;
216         for (;;)
217         {
218                 int hifd;
219
220                 /*
221                  * Clear the pipe, and check if the latch is set already. If someone
222                  * sets the latch between this and the select() below, the setter
223                  * will write a byte to the pipe (or signal us and the signal handler
224                  * will do that), and the select() will return immediately.
225                  */
226                 drainSelfPipe();
227                 if (latch->is_set)
228                 {
229                         result = 1;
230                         break;
231                 }
232
233                 FD_ZERO(&input_mask);
234                 FD_SET(selfpipe_readfd, &input_mask);
235                 hifd = selfpipe_readfd;
236                 if (sock != PGINVALID_SOCKET)
237                 {
238                         FD_SET(sock, &input_mask);
239                         if (sock > hifd)
240                                 hifd = sock;
241                 }
242
243                 rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
244                 if (rc < 0)
245                 {
246                         if (errno == EINTR)
247                                 continue;
248                         ereport(ERROR,
249                                         (errcode_for_socket_access(),
250                                          errmsg("select() failed: %m")));
251                 }
252                 if (rc == 0)
253                 {
254                         /* timeout exceeded */
255                         result = 0;
256                         break;
257                 }
258                 if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
259                 {
260                         result = 2;
261                         break;          /* data available in socket */
262                 }
263         }
264         waiting = false;
265
266         return result;
267 }
268
269 /*
270  * Sets a latch and wakes up anyone waiting on it. Returns quickly if the
271  * latch is already set.
272  */
273 void
274 SetLatch(volatile Latch *latch)
275 {
276         pid_t owner_pid;
277
278         /* Quick exit if already set */
279         if (latch->is_set)
280                 return;
281
282         latch->is_set = true;
283
284         /*
285          * See if anyone's waiting for the latch. It can be the current process
286          * if we're in a signal handler. We use the self-pipe to wake up the
287          * select() in that case. If it's another process, send a signal.
288          *
289          * Fetch owner_pid only once, in case the owner simultaneously disowns
290          * the latch and clears owner_pid. XXX: This assumes that pid_t is
291          * atomic, which isn't guaranteed to be true! In practice, the effective
292          * range of pid_t fits in a 32 bit integer, and so should be atomic. In
293          * the worst case, we might end up signaling wrong process if the right
294          * one disowns the latch just as we fetch owner_pid. Even then, you're
295          * very unlucky if a process with that bogus pid exists.
296          */
297         owner_pid = latch->owner_pid;
298         if (owner_pid == 0)
299                 return;
300         else if (owner_pid == MyProcPid)
301                 sendSelfPipeByte();
302         else
303                 kill(owner_pid, SIGUSR1);
304 }
305
306 /*
307  * Clear the latch. Calling WaitLatch after this will sleep, unless
308  * the latch is set again before the WaitLatch call.
309  */
310 void
311 ResetLatch(volatile Latch *latch)
312 {
313         /* Only the owner should reset the latch */
314         Assert(latch->owner_pid == MyProcPid);
315
316         latch->is_set = false;
317 }
318
319 /*
320  * LatchShmemSize
321  *              Compute space needed for latch's shared memory
322  *
323  * Not needed for Unix implementation.
324  */
325 Size
326 LatchShmemSize(void)
327 {
328         return 0;
329 }
330
331 /*
332  * LatchShmemInit
333  *              Allocate and initialize shared memory needed for latches
334  *
335  * Not needed for Unix implementation.
336  */
337 void
338 LatchShmemInit(void)
339 {
340 }
341
342 /*
343  * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake
344  * up WaitLatch.
345  */
346 void
347 latch_sigusr1_handler(void)
348 {
349         if (waiting)
350                 sendSelfPipeByte();
351 }
352
353 /* initialize the self-pipe */
354 static void
355 initSelfPipe(void)
356 {
357         int pipefd[2];
358
359         /*
360          * Set up the self-pipe that allows a signal handler to wake up the
361          * select() in WaitLatch. Make the write-end non-blocking, so that
362          * SetLatch won't block if the event has already been set many times
363          * filling the kernel buffer. Make the read-end non-blocking too, so
364          * that we can easily clear the pipe by reading until EAGAIN or
365          * EWOULDBLOCK.
366          */
367         if (pipe(pipefd) < 0)
368                 elog(FATAL, "pipe() failed: %m");
369         if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
370                 elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
371         if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
372                 elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");
373
374         selfpipe_readfd = pipefd[0];
375         selfpipe_writefd = pipefd[1];
376 }
377
378 /* Send one byte to the self-pipe, to wake up WaitLatch */
379 static void
380 sendSelfPipeByte(void)
381 {
382         int rc;
383         char dummy = 0;
384
385 retry:
386         rc = write(selfpipe_writefd, &dummy, 1);
387         if (rc < 0)
388         {
389                 /* If interrupted by signal, just retry */
390                 if (errno == EINTR)
391                         goto retry;
392
393                 /*
394                  * If the pipe is full, we don't need to retry, the data that's
395                  * there already is enough to wake up WaitLatch.
396                  */
397                 if (errno == EAGAIN || errno == EWOULDBLOCK)
398                         return;
399
400                 /*
401                  * Oops, the write() failed for some other reason. We might be in
402                  * a signal handler, so it's not safe to elog(). We have no choice
403                  * but silently ignore the error.
404                  */
405                 return;
406         }
407 }
408
409 /* Read all available data from the self-pipe */
410 static void
411 drainSelfPipe(void)
412 {
413         /*
414          * There shouldn't normally be more than one byte in the pipe, or maybe
415          * a few more if multiple processes run SetLatch at the same instant.
416          */
417         char buf[16];
418         int rc;
419
420         for (;;)
421         {
422                 rc = read(selfpipe_readfd, buf, sizeof(buf));
423                 if (rc < 0)
424                 {
425                         if (errno == EAGAIN || errno == EWOULDBLOCK)
426                                 break;          /* the pipe is empty */
427                         else if (errno == EINTR)
428                                 continue;       /* retry */
429                         else
430                                 elog(ERROR, "read() on self-pipe failed: %m");
431                 }
432                 else if (rc == 0)
433                         elog(ERROR, "unexpected EOF on self-pipe");
434         }
435 }