1 /*-------------------------------------------------------------------------
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
9 * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
13 * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.7 2010/07/06 19:18:57 momjian Exp $
15 *-------------------------------------------------------------------------
19 #include <sys/types.h>
26 #include "access/xlog_internal.h"
27 #include "replication/walreceiver.h"
28 #include "storage/fd.h"
29 #include "storage/pmsignal.h"
30 #include "storage/shmem.h"
31 #include "utils/guc.h"
33 WalRcvData *WalRcv = NULL;
36 * How long to wait for walreceiver to start up after requesting
37 * postmaster to launch it. In seconds.
39 #define WALRCV_STARTUP_TIMEOUT 10
41 /* Report shared memory space needed by WalRcvShmemInit */
47 size = add_size(size, sizeof(WalRcvData));
52 /* Allocate and initialize walreceiver-related shared memory */
58 WalRcv = (WalRcvData *)
59 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
63 /* First time through, so initialize */
64 MemSet(WalRcv, 0, WalRcvShmemSize());
65 WalRcv->walRcvState = WALRCV_STOPPED;
66 SpinLockInit(&WalRcv->mutex);
70 /* Is walreceiver in progress (or starting up)? */
72 WalRcvInProgress(void)
74 /* use volatile pointer to prevent code rearrangement */
75 volatile WalRcvData *walrcv = WalRcv;
79 SpinLockAcquire(&walrcv->mutex);
81 state = walrcv->walRcvState;
82 startTime = walrcv->startTime;
84 SpinLockRelease(&walrcv->mutex);
87 * If it has taken too long for walreceiver to start up, give up. Setting
88 * the state to STOPPED ensures that if walreceiver later does start up
89 * after all, it will see that it's not supposed to be running and die
90 * without doing anything.
92 if (state == WALRCV_STARTING)
94 pg_time_t now = (pg_time_t) time(NULL);
96 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
98 SpinLockAcquire(&walrcv->mutex);
100 if (walrcv->walRcvState == WALRCV_STARTING)
101 state = walrcv->walRcvState = WALRCV_STOPPED;
103 SpinLockRelease(&walrcv->mutex);
107 if (state != WALRCV_STOPPED)
114 * Stop walreceiver (if running) and wait for it to die.
119 /* use volatile pointer to prevent code rearrangement */
120 volatile WalRcvData *walrcv = WalRcv;
124 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
125 * mode once it's finished, and will also request postmaster to not
128 SpinLockAcquire(&walrcv->mutex);
129 switch (walrcv->walRcvState)
133 case WALRCV_STARTING:
134 walrcv->walRcvState = WALRCV_STOPPED;
138 walrcv->walRcvState = WALRCV_STOPPING;
140 case WALRCV_STOPPING:
141 walrcvpid = walrcv->pid;
144 SpinLockRelease(&walrcv->mutex);
147 * Signal walreceiver process if it was still running.
150 kill(walrcvpid, SIGTERM);
153 * Wait for walreceiver to acknowledge its death by setting state to
156 while (WalRcvInProgress())
159 * This possibly-long loop needs to handle interrupts of startup
162 HandleStartupProcInterrupts();
164 pg_usleep(100000); /* 100ms */
169 * Request postmaster to start walreceiver.
171 * recptr indicates the position where streaming should begin, and conninfo
172 * is a libpq connection string to use.
175 RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
177 /* use volatile pointer to prevent code rearrangement */
178 volatile WalRcvData *walrcv = WalRcv;
179 pg_time_t now = (pg_time_t) time(NULL);
182 * We always start at the beginning of the segment. That prevents a broken
183 * segment (i.e., with no records in the first half of a segment) from
184 * being created by XLOG streaming, which might cause trouble later on if
185 * the segment is e.g archived.
187 if (recptr.xrecoff % XLogSegSize != 0)
188 recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
190 SpinLockAcquire(&walrcv->mutex);
192 /* It better be stopped before we try to restart it */
193 Assert(walrcv->walRcvState == WALRCV_STOPPED);
195 if (conninfo != NULL)
196 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
198 walrcv->conninfo[0] = '\0';
199 walrcv->walRcvState = WALRCV_STARTING;
200 walrcv->startTime = now;
202 walrcv->receivedUpto = recptr;
203 walrcv->latestChunkStart = recptr;
205 SpinLockRelease(&walrcv->mutex);
207 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
211 * Returns the last+1 byte position that walreceiver has written.
213 * Optionally, returns the previous chunk start, that is the first byte
214 * written in the most recent walreceiver flush cycle. Callers not
215 * interested in that value may pass NULL for latestChunkStart.
218 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
220 /* use volatile pointer to prevent code rearrangement */
221 volatile WalRcvData *walrcv = WalRcv;
224 SpinLockAcquire(&walrcv->mutex);
225 recptr = walrcv->receivedUpto;
226 if (latestChunkStart)
227 *latestChunkStart = walrcv->latestChunkStart;
228 SpinLockRelease(&walrcv->mutex);