]> granicus.if.org Git - postgresql/blob - src/backend/replication/walreceiverfuncs.c
pgindent run for 9.0, second run
[postgresql] / src / backend / replication / walreceiverfuncs.c
1 /*-------------------------------------------------------------------------
2  *
3  * walreceiverfuncs.c
4  *
5  * This file contains functions used by the startup process to communicate
6  * with the walreceiver process. Functions implementing walreceiver itself
7  * are in walreceiver.c.
8  *
9  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *        $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.7 2010/07/06 19:18:57 momjian Exp $
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/time.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <signal.h>
25
26 #include "access/xlog_internal.h"
27 #include "replication/walreceiver.h"
28 #include "storage/fd.h"
29 #include "storage/pmsignal.h"
30 #include "storage/shmem.h"
31 #include "utils/guc.h"
32
33 WalRcvData *WalRcv = NULL;
34
35 /*
36  * How long to wait for walreceiver to start up after requesting
37  * postmaster to launch it. In seconds.
38  */
39 #define WALRCV_STARTUP_TIMEOUT 10
40
41 /* Report shared memory space needed by WalRcvShmemInit */
42 Size
43 WalRcvShmemSize(void)
44 {
45         Size            size = 0;
46
47         size = add_size(size, sizeof(WalRcvData));
48
49         return size;
50 }
51
52 /* Allocate and initialize walreceiver-related shared memory */
53 void
54 WalRcvShmemInit(void)
55 {
56         bool            found;
57
58         WalRcv = (WalRcvData *)
59                 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60
61         if (!found)
62         {
63                 /* First time through, so initialize */
64                 MemSet(WalRcv, 0, WalRcvShmemSize());
65                 WalRcv->walRcvState = WALRCV_STOPPED;
66                 SpinLockInit(&WalRcv->mutex);
67         }
68 }
69
70 /* Is walreceiver in progress (or starting up)? */
71 bool
72 WalRcvInProgress(void)
73 {
74         /* use volatile pointer to prevent code rearrangement */
75         volatile WalRcvData *walrcv = WalRcv;
76         WalRcvState state;
77         pg_time_t       startTime;
78
79         SpinLockAcquire(&walrcv->mutex);
80
81         state = walrcv->walRcvState;
82         startTime = walrcv->startTime;
83
84         SpinLockRelease(&walrcv->mutex);
85
86         /*
87          * If it has taken too long for walreceiver to start up, give up. Setting
88          * the state to STOPPED ensures that if walreceiver later does start up
89          * after all, it will see that it's not supposed to be running and die
90          * without doing anything.
91          */
92         if (state == WALRCV_STARTING)
93         {
94                 pg_time_t       now = (pg_time_t) time(NULL);
95
96                 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97                 {
98                         SpinLockAcquire(&walrcv->mutex);
99
100                         if (walrcv->walRcvState == WALRCV_STARTING)
101                                 state = walrcv->walRcvState = WALRCV_STOPPED;
102
103                         SpinLockRelease(&walrcv->mutex);
104                 }
105         }
106
107         if (state != WALRCV_STOPPED)
108                 return true;
109         else
110                 return false;
111 }
112
113 /*
114  * Stop walreceiver (if running) and wait for it to die.
115  */
116 void
117 ShutdownWalRcv(void)
118 {
119         /* use volatile pointer to prevent code rearrangement */
120         volatile WalRcvData *walrcv = WalRcv;
121         pid_t           walrcvpid = 0;
122
123         /*
124          * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
125          * mode once it's finished, and will also request postmaster to not
126          * restart itself.
127          */
128         SpinLockAcquire(&walrcv->mutex);
129         switch (walrcv->walRcvState)
130         {
131                 case WALRCV_STOPPED:
132                         break;
133                 case WALRCV_STARTING:
134                         walrcv->walRcvState = WALRCV_STOPPED;
135                         break;
136
137                 case WALRCV_RUNNING:
138                         walrcv->walRcvState = WALRCV_STOPPING;
139                         /* fall through */
140                 case WALRCV_STOPPING:
141                         walrcvpid = walrcv->pid;
142                         break;
143         }
144         SpinLockRelease(&walrcv->mutex);
145
146         /*
147          * Signal walreceiver process if it was still running.
148          */
149         if (walrcvpid != 0)
150                 kill(walrcvpid, SIGTERM);
151
152         /*
153          * Wait for walreceiver to acknowledge its death by setting state to
154          * WALRCV_STOPPED.
155          */
156         while (WalRcvInProgress())
157         {
158                 /*
159                  * This possibly-long loop needs to handle interrupts of startup
160                  * process.
161                  */
162                 HandleStartupProcInterrupts();
163
164                 pg_usleep(100000);              /* 100ms */
165         }
166 }
167
168 /*
169  * Request postmaster to start walreceiver.
170  *
171  * recptr indicates the position where streaming should begin, and conninfo
172  * is a libpq connection string to use.
173  */
174 void
175 RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
176 {
177         /* use volatile pointer to prevent code rearrangement */
178         volatile WalRcvData *walrcv = WalRcv;
179         pg_time_t       now = (pg_time_t) time(NULL);
180
181         /*
182          * We always start at the beginning of the segment. That prevents a broken
183          * segment (i.e., with no records in the first half of a segment) from
184          * being created by XLOG streaming, which might cause trouble later on if
185          * the segment is e.g archived.
186          */
187         if (recptr.xrecoff % XLogSegSize != 0)
188                 recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
189
190         SpinLockAcquire(&walrcv->mutex);
191
192         /* It better be stopped before we try to restart it */
193         Assert(walrcv->walRcvState == WALRCV_STOPPED);
194
195         if (conninfo != NULL)
196                 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
197         else
198                 walrcv->conninfo[0] = '\0';
199         walrcv->walRcvState = WALRCV_STARTING;
200         walrcv->startTime = now;
201
202         walrcv->receivedUpto = recptr;
203         walrcv->latestChunkStart = recptr;
204
205         SpinLockRelease(&walrcv->mutex);
206
207         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
208 }
209
210 /*
211  * Returns the last+1 byte position that walreceiver has written.
212  *
213  * Optionally, returns the previous chunk start, that is the first byte
214  * written in the most recent walreceiver flush cycle.  Callers not
215  * interested in that value may pass NULL for latestChunkStart.
216  */
217 XLogRecPtr
218 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
219 {
220         /* use volatile pointer to prevent code rearrangement */
221         volatile WalRcvData *walrcv = WalRcv;
222         XLogRecPtr      recptr;
223
224         SpinLockAcquire(&walrcv->mutex);
225         recptr = walrcv->receivedUpto;
226         if (latestChunkStart)
227                 *latestChunkStart = walrcv->latestChunkStart;
228         SpinLockRelease(&walrcv->mutex);
229
230         return recptr;
231 }