]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Exit from base backups when shutdown is requested
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  * It attempts to keep reading XLOG records from the disk and sending them
11  * to the standby server, as long as the connection is alive (i.e., like
12  * any backend, there is a one-to-one relationship between a connection
13  * and a walsender process).
14  *
15  * Normal termination is by SIGTERM, which instructs the walsender to
16  * close the connection and exit(0) at next convenient moment. Emergency
17  * termination is by SIGQUIT; like any backend, the walsender will simply
18  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
19  * are treated as not a crash but approximately normal termination;
20  * the walsender will exit quickly without sending any more XLOG records.
21  *
22  * If the server is shut down, postmaster sends us SIGUSR2 after all
23  * regular backends have exited and the shutdown checkpoint has been written.
24  * This instruct walsender to send any outstanding WAL, including the
25  * shutdown checkpoint record, and then exit.
26  *
27  *
28  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
29  *
30  * IDENTIFICATION
31  *        src/backend/replication/walsender.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres.h"
36
37 #include <signal.h>
38 #include <unistd.h>
39
40 #include "funcapi.h"
41 #include "access/xlog_internal.h"
42 #include "catalog/pg_type.h"
43 #include "libpq/libpq.h"
44 #include "libpq/pqformat.h"
45 #include "libpq/pqsignal.h"
46 #include "miscadmin.h"
47 #include "replication/basebackup.h"
48 #include "replication/walprotocol.h"
49 #include "replication/walsender.h"
50 #include "storage/fd.h"
51 #include "storage/ipc.h"
52 #include "storage/pmsignal.h"
53 #include "tcop/tcopprot.h"
54 #include "utils/builtins.h"
55 #include "utils/guc.h"
56 #include "utils/memutils.h"
57 #include "utils/ps_status.h"
58 #include "utils/resowner.h"
59
60
61 /* Array of WalSnds in shared memory */
62 WalSndCtlData *WalSndCtl = NULL;
63
64 /* My slot in the shared memory array */
65 static WalSnd *MyWalSnd = NULL;
66
67 /* Global state */
68 bool            am_walsender = false;           /* Am I a walsender process ? */
69
70 /* User-settable parameters for walsender */
71 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
72 int                     WalSndDelay = 200;      /* max sleep time between some actions */
73
74 /*
75  * These variables are used similarly to openLogFile/Id/Seg/Off,
76  * but for walsender to read the XLOG.
77  */
78 static int      sendFile = -1;
79 static uint32 sendId = 0;
80 static uint32 sendSeg = 0;
81 static uint32 sendOff = 0;
82
83 /*
84  * How far have we sent WAL already? This is also advertised in
85  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
86  */
87 static XLogRecPtr sentPtr = {0, 0};
88
89 /* Flags set by signal handlers for later service in main loop */
90 static volatile sig_atomic_t got_SIGHUP = false;
91 volatile sig_atomic_t walsender_shutdown_requested = false;
92 volatile sig_atomic_t walsender_ready_to_stop = false;
93
94 /* Signal handlers */
95 static void WalSndSigHupHandler(SIGNAL_ARGS);
96 static void WalSndShutdownHandler(SIGNAL_ARGS);
97 static void WalSndQuickDieHandler(SIGNAL_ARGS);
98 static void WalSndXLogSendHandler(SIGNAL_ARGS);
99 static void WalSndLastCycleHandler(SIGNAL_ARGS);
100
101 /* Prototypes for private functions */
102 static int      WalSndLoop(void);
103 static void InitWalSnd(void);
104 static void WalSndHandshake(void);
105 static void WalSndKill(int code, Datum arg);
106 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
107 static bool XLogSend(char *msgbuf, bool *caughtup);
108 static void CheckClosedConnection(void);
109
110
111 /* Main entry point for walsender process */
112 int
113 WalSenderMain(void)
114 {
115         MemoryContext walsnd_context;
116
117         if (RecoveryInProgress())
118                 ereport(FATAL,
119                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
120                                  errmsg("recovery is still in progress, can't accept WAL streaming connections")));
121
122         /* Create a per-walsender data structure in shared memory */
123         InitWalSnd();
124
125         /*
126          * Create a memory context that we will do all our work in.  We do this so
127          * that we can reset the context during error recovery and thereby avoid
128          * possible memory leaks.  Formerly this code just ran in
129          * TopMemoryContext, but resetting that would be a really bad idea.
130          *
131          * XXX: we don't actually attempt error recovery in walsender, we just
132          * close the connection and exit.
133          */
134         walsnd_context = AllocSetContextCreate(TopMemoryContext,
135                                                                                    "Wal Sender",
136                                                                                    ALLOCSET_DEFAULT_MINSIZE,
137                                                                                    ALLOCSET_DEFAULT_INITSIZE,
138                                                                                    ALLOCSET_DEFAULT_MAXSIZE);
139         MemoryContextSwitchTo(walsnd_context);
140
141         /* Set up resource owner */
142         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
143
144         /* Unblock signals (they were blocked when the postmaster forked us) */
145         PG_SETMASK(&UnBlockSig);
146
147         /* Tell the standby that walsender is ready for receiving commands */
148         ReadyForQuery(DestRemote);
149
150         /* Handle handshake messages before streaming */
151         WalSndHandshake();
152
153         /* Initialize shared memory status */
154         {
155                 /* use volatile pointer to prevent code rearrangement */
156                 volatile WalSnd *walsnd = MyWalSnd;
157
158                 SpinLockAcquire(&walsnd->mutex);
159                 walsnd->sentPtr = sentPtr;
160                 SpinLockRelease(&walsnd->mutex);
161         }
162
163         /* Main loop of walsender */
164         return WalSndLoop();
165 }
166
167 /*
168  * Execute commands from walreceiver, until we enter streaming mode.
169  */
170 static void
171 WalSndHandshake(void)
172 {
173         StringInfoData input_message;
174         bool            replication_started = false;
175
176         initStringInfo(&input_message);
177
178         while (!replication_started)
179         {
180                 int                     firstchar;
181
182                 WalSndSetState(WALSNDSTATE_STARTUP);
183                 set_ps_display("idle", false);
184
185                 /* Wait for a command to arrive */
186                 firstchar = pq_getbyte();
187
188                 /*
189                  * Emergency bailout if postmaster has died.  This is to avoid the
190                  * necessity for manual cleanup of all postmaster children.
191                  */
192                 if (!PostmasterIsAlive(true))
193                         exit(1);
194
195                 /*
196                  * Check for any other interesting events that happened while we
197                  * slept.
198                  */
199                 if (got_SIGHUP)
200                 {
201                         got_SIGHUP = false;
202                         ProcessConfigFile(PGC_SIGHUP);
203                 }
204
205                 if (firstchar != EOF)
206                 {
207                         /*
208                          * Read the message contents. This is expected to be done without
209                          * blocking because we've been able to get message type code.
210                          */
211                         if (pq_getmessage(&input_message, 0))
212                                 firstchar = EOF;        /* suitable message already logged */
213                 }
214
215                 /* Handle the very limited subset of commands expected in this phase */
216                 switch (firstchar)
217                 {
218                         case 'Q':                       /* Query message */
219                                 {
220                                         const char *query_string;
221                                         XLogRecPtr      recptr;
222
223                                         query_string = pq_getmsgstring(&input_message);
224                                         pq_getmsgend(&input_message);
225
226                                         if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
227                                         {
228                                                 StringInfoData buf;
229                                                 char            sysid[32];
230                                                 char            tli[11];
231
232                                                 /*
233                                                  * Reply with a result set with one row, two columns.
234                                                  * First col is system ID, and second is timeline ID
235                                                  */
236
237                                                 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
238                                                                  GetSystemIdentifier());
239                                                 snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
240
241                                                 /* Send a RowDescription message */
242                                                 pq_beginmessage(&buf, 'T');
243                                                 pq_sendint(&buf, 2, 2); /* 2 fields */
244
245                                                 /* first field */
246                                                 pq_sendstring(&buf, "systemid");                /* col name */
247                                                 pq_sendint(&buf, 0, 4); /* table oid */
248                                                 pq_sendint(&buf, 0, 2); /* attnum */
249                                                 pq_sendint(&buf, TEXTOID, 4);   /* type oid */
250                                                 pq_sendint(&buf, -1, 2);                /* typlen */
251                                                 pq_sendint(&buf, 0, 4); /* typmod */
252                                                 pq_sendint(&buf, 0, 2); /* format code */
253
254                                                 /* second field */
255                                                 pq_sendstring(&buf, "timeline");                /* col name */
256                                                 pq_sendint(&buf, 0, 4); /* table oid */
257                                                 pq_sendint(&buf, 0, 2); /* attnum */
258                                                 pq_sendint(&buf, INT4OID, 4);   /* type oid */
259                                                 pq_sendint(&buf, 4, 2); /* typlen */
260                                                 pq_sendint(&buf, 0, 4); /* typmod */
261                                                 pq_sendint(&buf, 0, 2); /* format code */
262                                                 pq_endmessage(&buf);
263
264                                                 /* Send a DataRow message */
265                                                 pq_beginmessage(&buf, 'D');
266                                                 pq_sendint(&buf, 2, 2); /* # of columns */
267                                                 pq_sendint(&buf, strlen(sysid), 4);             /* col1 len */
268                                                 pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
269                                                 pq_sendint(&buf, strlen(tli), 4);               /* col2 len */
270                                                 pq_sendbytes(&buf, (char *) tli, strlen(tli));
271                                                 pq_endmessage(&buf);
272
273                                                 /* Send CommandComplete and ReadyForQuery messages */
274                                                 EndCommand("SELECT", DestRemote);
275                                                 ReadyForQuery(DestRemote);
276                                                 /* ReadyForQuery did pq_flush for us */
277                                         }
278                                         else if (sscanf(query_string, "START_REPLICATION %X/%X",
279                                                                         &recptr.xlogid, &recptr.xrecoff) == 2)
280                                         {
281                                                 StringInfoData buf;
282
283                                                 /*
284                                                  * Check that we're logging enough information in the
285                                                  * WAL for log-shipping.
286                                                  *
287                                                  * NOTE: This only checks the current value of
288                                                  * wal_level. Even if the current setting is not
289                                                  * 'minimal', there can be old WAL in the pg_xlog
290                                                  * directory that was created with 'minimal'. So this
291                                                  * is not bulletproof, the purpose is just to give a
292                                                  * user-friendly error message that hints how to
293                                                  * configure the system correctly.
294                                                  */
295                                                 if (wal_level == WAL_LEVEL_MINIMAL)
296                                                         ereport(FATAL,
297                                                                         (errcode(ERRCODE_CANNOT_CONNECT_NOW),
298                                                                          errmsg("standby connections not allowed because wal_level=minimal")));
299
300                                                 /* Send a CopyBothResponse message, and start streaming */
301                                                 pq_beginmessage(&buf, 'W');
302                                                 pq_sendbyte(&buf, 0);
303                                                 pq_sendint(&buf, 0, 2);
304                                                 pq_endmessage(&buf);
305                                                 pq_flush();
306
307                                                 /*
308                                                  * Initialize position to the received one, then the
309                                                  * xlog records begin to be shipped from that position
310                                                  */
311                                                 sentPtr = recptr;
312
313                                                 /* break out of the loop */
314                                                 replication_started = true;
315                                         }
316                                         else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0)
317                                         {
318                                                 /* Command is BASE_BACKUP <options>;<label> */
319                                                 SendBaseBackup(query_string + strlen("BASE_BACKUP "));
320                                                 /* Send CommandComplete and ReadyForQuery messages */
321                                                 EndCommand("SELECT", DestRemote);
322                                                 ReadyForQuery(DestRemote);
323                                                 /* ReadyForQuery did pq_flush for us */
324                                         }
325                                         else
326                                         {
327                                                 ereport(FATAL,
328                                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
329                                                                  errmsg("invalid standby query string: %s", query_string)));
330                                         }
331                                         break;
332                                 }
333
334                         case 'X':
335                                 /* standby is closing the connection */
336                                 proc_exit(0);
337
338                         case EOF:
339                                 /* standby disconnected unexpectedly */
340                                 ereport(COMMERROR,
341                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
342                                                  errmsg("unexpected EOF on standby connection")));
343                                 proc_exit(0);
344
345                         default:
346                                 ereport(FATAL,
347                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
348                                                  errmsg("invalid standby handshake message type %d", firstchar)));
349                 }
350         }
351 }
352
353 /*
354  * Check if the remote end has closed the connection.
355  */
356 static void
357 CheckClosedConnection(void)
358 {
359         unsigned char firstchar;
360         int                     r;
361
362         r = pq_getbyte_if_available(&firstchar);
363         if (r < 0)
364         {
365                 /* unexpected error or EOF */
366                 ereport(COMMERROR,
367                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
368                                  errmsg("unexpected EOF on standby connection")));
369                 proc_exit(0);
370         }
371         if (r == 0)
372         {
373                 /* no data available without blocking */
374                 return;
375         }
376
377         /* Handle the very limited subset of commands expected in this phase */
378         switch (firstchar)
379         {
380                         /*
381                          * 'X' means that the standby is closing down the socket.
382                          */
383                 case 'X':
384                         proc_exit(0);
385
386                 default:
387                         ereport(FATAL,
388                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
389                                          errmsg("invalid standby closing message type %d",
390                                                         firstchar)));
391         }
392 }
393
394 /* Main loop of walsender process */
395 static int
396 WalSndLoop(void)
397 {
398         char       *output_message;
399         bool            caughtup = false;
400
401         /*
402          * Allocate buffer that will be used for each output message.  We do this
403          * just once to reduce palloc overhead.  The buffer must be made large
404          * enough for maximum-sized messages.
405          */
406         output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
407
408         /* Loop forever, unless we get an error */
409         for (;;)
410         {
411                 /*
412                  * Emergency bailout if postmaster has died.  This is to avoid the
413                  * necessity for manual cleanup of all postmaster children.
414                  */
415                 if (!PostmasterIsAlive(true))
416                         exit(1);
417
418                 /* Process any requests or signals received recently */
419                 if (got_SIGHUP)
420                 {
421                         got_SIGHUP = false;
422                         ProcessConfigFile(PGC_SIGHUP);
423                 }
424
425                 /*
426                  * When SIGUSR2 arrives, we send all outstanding logs up to the
427                  * shutdown checkpoint record (i.e., the latest record) and exit.
428                  */
429                 if (walsender_ready_to_stop)
430                 {
431                         if (!XLogSend(output_message, &caughtup))
432                                 break;
433                         if (caughtup)
434                                 walsender_shutdown_requested = true;
435                 }
436
437                 /* Normal exit from the walsender is here */
438                 if (walsender_shutdown_requested)
439                 {
440                         /* Inform the standby that XLOG streaming was done */
441                         pq_puttextmessage('C', "COPY 0");
442                         pq_flush();
443
444                         proc_exit(0);
445                 }
446
447                 /*
448                  * If we had sent all accumulated WAL in last round, nap for the
449                  * configured time before retrying.
450                  */
451                 if (caughtup)
452                 {
453                         /*
454                          * Even if we wrote all the WAL that was available when we started
455                          * sending, more might have arrived while we were sending this
456                          * batch. We had the latch set while sending, so we have not
457                          * received any signals from that time. Let's arm the latch
458                          * again, and after that check that we're still up-to-date.
459                          */
460                         ResetLatch(&MyWalSnd->latch);
461
462                         if (!XLogSend(output_message, &caughtup))
463                                 break;
464                         if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
465                         {
466                                 /*
467                                  * XXX: We don't really need the periodic wakeups anymore,
468                                  * WaitLatchOrSocket should reliably wake up as soon as
469                                  * something interesting happens.
470                                  */
471
472                                 /* Sleep */
473                                 WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
474                                                                   WalSndDelay * 1000L);
475                         }
476
477                         /* Check if the connection was closed */
478                         CheckClosedConnection();
479                 }
480                 else
481                 {
482                         /* Attempt to send the log once every loop */
483                         if (!XLogSend(output_message, &caughtup))
484                                 break;
485                 }
486
487                 /* Update our state to indicate if we're behind or not */
488                 WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
489         }
490
491         /*
492          * Get here on send failure.  Clean up and exit.
493          *
494          * Reset whereToSendOutput to prevent ereport from attempting to send any
495          * more messages to the standby.
496          */
497         if (whereToSendOutput == DestRemote)
498                 whereToSendOutput = DestNone;
499
500         proc_exit(0);
501         return 1;                                       /* keep the compiler quiet */
502 }
503
504 /* Initialize a per-walsender data structure for this walsender process */
505 static void
506 InitWalSnd(void)
507 {
508         int                     i;
509
510         /*
511          * WalSndCtl should be set up already (we inherit this by fork() or
512          * EXEC_BACKEND mechanism from the postmaster).
513          */
514         Assert(WalSndCtl != NULL);
515         Assert(MyWalSnd == NULL);
516
517         /*
518          * Find a free walsender slot and reserve it. If this fails, we must be
519          * out of WalSnd structures.
520          */
521         for (i = 0; i < max_wal_senders; i++)
522         {
523                 /* use volatile pointer to prevent code rearrangement */
524                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
525
526                 SpinLockAcquire(&walsnd->mutex);
527
528                 if (walsnd->pid != 0)
529                 {
530                         SpinLockRelease(&walsnd->mutex);
531                         continue;
532                 }
533                 else
534                 {
535                         /*
536                          * Found a free slot. Reserve it for us.
537                          */
538                         walsnd->pid = MyProcPid;
539                         MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
540                         walsnd->state = WALSNDSTATE_STARTUP;
541                         SpinLockRelease(&walsnd->mutex);
542                         /* don't need the lock anymore */
543                         OwnLatch((Latch *) &walsnd->latch);
544                         MyWalSnd = (WalSnd *) walsnd;
545
546                         break;
547                 }
548         }
549         if (MyWalSnd == NULL)
550                 ereport(FATAL,
551                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
552                                  errmsg("number of requested standby connections "
553                                                 "exceeds max_wal_senders (currently %d)",
554                                                 max_wal_senders)));
555
556         /* Arrange to clean up at walsender exit */
557         on_shmem_exit(WalSndKill, 0);
558 }
559
560 /* Destroy the per-walsender data structure for this walsender process */
561 static void
562 WalSndKill(int code, Datum arg)
563 {
564         Assert(MyWalSnd != NULL);
565
566         /*
567          * Mark WalSnd struct no longer in use. Assume that no lock is required
568          * for this.
569          */
570         MyWalSnd->pid = 0;
571         DisownLatch(&MyWalSnd->latch);
572
573         /* WalSnd struct isn't mine anymore */
574         MyWalSnd = NULL;
575 }
576
577 /*
578  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
579  *
580  * XXX probably this should be improved to suck data directly from the
581  * WAL buffers when possible.
582  */
583 static void
584 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
585 {
586         XLogRecPtr      startRecPtr = recptr;
587         char            path[MAXPGPATH];
588         uint32          lastRemovedLog;
589         uint32          lastRemovedSeg;
590         uint32          log;
591         uint32          seg;
592
593         while (nbytes > 0)
594         {
595                 uint32          startoff;
596                 int                     segbytes;
597                 int                     readbytes;
598
599                 startoff = recptr.xrecoff % XLogSegSize;
600
601                 if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
602                 {
603                         /* Switch to another logfile segment */
604                         if (sendFile >= 0)
605                                 close(sendFile);
606
607                         XLByteToSeg(recptr, sendId, sendSeg);
608                         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
609
610                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
611                         if (sendFile < 0)
612                         {
613                                 /*
614                                  * If the file is not found, assume it's because the standby
615                                  * asked for a too old WAL segment that has already been
616                                  * removed or recycled.
617                                  */
618                                 if (errno == ENOENT)
619                                 {
620                                         char            filename[MAXFNAMELEN];
621
622                                         XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
623                                         ereport(ERROR,
624                                                         (errcode_for_file_access(),
625                                                          errmsg("requested WAL segment %s has already been removed",
626                                                                         filename)));
627                                 }
628                                 else
629                                         ereport(ERROR,
630                                                         (errcode_for_file_access(),
631                                                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
632                                                                         path, sendId, sendSeg)));
633                         }
634                         sendOff = 0;
635                 }
636
637                 /* Need to seek in the file? */
638                 if (sendOff != startoff)
639                 {
640                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
641                                 ereport(ERROR,
642                                                 (errcode_for_file_access(),
643                                                  errmsg("could not seek in log file %u, segment %u to offset %u: %m",
644                                                                 sendId, sendSeg, startoff)));
645                         sendOff = startoff;
646                 }
647
648                 /* How many bytes are within this segment? */
649                 if (nbytes > (XLogSegSize - startoff))
650                         segbytes = XLogSegSize - startoff;
651                 else
652                         segbytes = nbytes;
653
654                 readbytes = read(sendFile, buf, segbytes);
655                 if (readbytes <= 0)
656                         ereport(ERROR,
657                                         (errcode_for_file_access(),
658                         errmsg("could not read from log file %u, segment %u, offset %u, "
659                                    "length %lu: %m",
660                                    sendId, sendSeg, sendOff, (unsigned long) segbytes)));
661
662                 /* Update state for read */
663                 XLByteAdvance(recptr, readbytes);
664
665                 sendOff += readbytes;
666                 nbytes -= readbytes;
667                 buf += readbytes;
668         }
669
670         /*
671          * After reading into the buffer, check that what we read was valid. We do
672          * this after reading, because even though the segment was present when we
673          * opened it, it might get recycled or removed while we read it. The
674          * read() succeeds in that case, but the data we tried to read might
675          * already have been overwritten with new WAL records.
676          */
677         XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
678         XLByteToSeg(startRecPtr, log, seg);
679         if (log < lastRemovedLog ||
680                 (log == lastRemovedLog && seg <= lastRemovedSeg))
681         {
682                 char            filename[MAXFNAMELEN];
683
684                 XLogFileName(filename, ThisTimeLineID, log, seg);
685                 ereport(ERROR,
686                                 (errcode_for_file_access(),
687                                  errmsg("requested WAL segment %s has already been removed",
688                                                 filename)));
689         }
690 }
691
692 /*
693  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
694  * but not yet sent to the client, and send it.
695  *
696  * msgbuf is a work area in which the output message is constructed.  It's
697  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
698  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
699  *
700  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
701  * *caughtup is set to false.
702  *
703  * Returns true if OK, false if trouble.
704  */
705 static bool
706 XLogSend(char *msgbuf, bool *caughtup)
707 {
708         XLogRecPtr      SendRqstPtr;
709         XLogRecPtr      startptr;
710         XLogRecPtr      endptr;
711         Size            nbytes;
712         WalDataMessageHeader msghdr;
713
714         /*
715          * Attempt to send all data that's already been written out and fsync'd to
716          * disk.  We cannot go further than what's been written out given the
717          * current implementation of XLogRead().  And in any case it's unsafe to
718          * send WAL that is not securely down to disk on the master: if the master
719          * subsequently crashes and restarts, slaves must not have applied any WAL
720          * that gets lost on the master.
721          */
722         SendRqstPtr = GetFlushRecPtr();
723
724         /* Quick exit if nothing to do */
725         if (XLByteLE(SendRqstPtr, sentPtr))
726         {
727                 *caughtup = true;
728                 return true;
729         }
730
731         /*
732          * Figure out how much to send in one message. If there's no more than
733          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
734          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
735          *
736          * The rounding is not only for performance reasons. Walreceiver relies on
737          * the fact that we never split a WAL record across two messages. Since a
738          * long WAL record is split at page boundary into continuation records,
739          * page boundary is always a safe cut-off point. We also assume that
740          * SendRqstPtr never points to the middle of a WAL record.
741          */
742         startptr = sentPtr;
743         if (startptr.xrecoff >= XLogFileSize)
744         {
745                 /*
746                  * crossing a logid boundary, skip the non-existent last log segment
747                  * in previous logical log file.
748                  */
749                 startptr.xlogid += 1;
750                 startptr.xrecoff = 0;
751         }
752
753         endptr = startptr;
754         XLByteAdvance(endptr, MAX_SEND_SIZE);
755         if (endptr.xlogid != startptr.xlogid)
756         {
757                 /* Don't cross a logfile boundary within one message */
758                 Assert(endptr.xlogid == startptr.xlogid + 1);
759                 endptr.xlogid = startptr.xlogid;
760                 endptr.xrecoff = XLogFileSize;
761         }
762
763         /* if we went beyond SendRqstPtr, back off */
764         if (XLByteLE(SendRqstPtr, endptr))
765         {
766                 endptr = SendRqstPtr;
767                 *caughtup = true;
768         }
769         else
770         {
771                 /* round down to page boundary. */
772                 endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
773                 *caughtup = false;
774         }
775
776         nbytes = endptr.xrecoff - startptr.xrecoff;
777         Assert(nbytes <= MAX_SEND_SIZE);
778
779         /*
780          * OK to read and send the slice.
781          */
782         msgbuf[0] = 'w';
783
784         /*
785          * Read the log directly into the output buffer to avoid extra memcpy
786          * calls.
787          */
788         XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
789
790         /*
791          * We fill the message header last so that the send timestamp is taken as
792          * late as possible.
793          */
794         msghdr.dataStart = startptr;
795         msghdr.walEnd = SendRqstPtr;
796         msghdr.sendTime = GetCurrentTimestamp();
797
798         memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
799
800         pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
801
802         /* Flush pending output to the client */
803         if (pq_flush())
804                 return false;
805
806         sentPtr = endptr;
807
808         /* Update shared memory status */
809         {
810                 /* use volatile pointer to prevent code rearrangement */
811                 volatile WalSnd *walsnd = MyWalSnd;
812
813                 SpinLockAcquire(&walsnd->mutex);
814                 walsnd->sentPtr = sentPtr;
815                 SpinLockRelease(&walsnd->mutex);
816         }
817
818         /* Report progress of XLOG streaming in PS display */
819         if (update_process_title)
820         {
821                 char            activitymsg[50];
822
823                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
824                                  sentPtr.xlogid, sentPtr.xrecoff);
825                 set_ps_display(activitymsg, false);
826         }
827
828         return true;
829 }
830
831 /* SIGHUP: set flag to re-read config file at next convenient time */
832 static void
833 WalSndSigHupHandler(SIGNAL_ARGS)
834 {
835         got_SIGHUP = true;
836         if (MyWalSnd)
837                 SetLatch(&MyWalSnd->latch);
838 }
839
840 /* SIGTERM: set flag to shut down */
841 static void
842 WalSndShutdownHandler(SIGNAL_ARGS)
843 {
844         walsender_shutdown_requested = true;
845         if (MyWalSnd)
846                 SetLatch(&MyWalSnd->latch);
847 }
848
849 /*
850  * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
851  *
852  * Some backend has bought the farm,
853  * so we need to stop what we're doing and exit.
854  */
855 static void
856 WalSndQuickDieHandler(SIGNAL_ARGS)
857 {
858         PG_SETMASK(&BlockSig);
859
860         /*
861          * We DO NOT want to run proc_exit() callbacks -- we're here because
862          * shared memory may be corrupted, so we don't want to try to clean up our
863          * transaction.  Just nail the windows shut and get out of town.  Now that
864          * there's an atexit callback to prevent third-party code from breaking
865          * things by calling exit() directly, we have to reset the callbacks
866          * explicitly to make this work as intended.
867          */
868         on_exit_reset();
869
870         /*
871          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
872          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
873          * backend.  This is necessary precisely because we don't clean up our
874          * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
875          * should ensure the postmaster sees this as a crash, too, but no harm in
876          * being doubly sure.)
877          */
878         exit(2);
879 }
880
881 /* SIGUSR1: set flag to send WAL records */
882 static void
883 WalSndXLogSendHandler(SIGNAL_ARGS)
884 {
885         latch_sigusr1_handler();
886 }
887
888 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
889 static void
890 WalSndLastCycleHandler(SIGNAL_ARGS)
891 {
892         walsender_ready_to_stop = true;
893         if (MyWalSnd)
894                 SetLatch(&MyWalSnd->latch);
895 }
896
897 /* Set up signal handlers */
898 void
899 WalSndSignals(void)
900 {
901         /* Set up signal handlers */
902         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
903                                                                                                  * file */
904         pqsignal(SIGINT, SIG_IGN);      /* not used */
905         pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
906         pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
907         pqsignal(SIGALRM, SIG_IGN);
908         pqsignal(SIGPIPE, SIG_IGN);
909         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
910         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
911                                                                                                  * shutdown */
912
913         /* Reset some signals that are accepted by postmaster but not here */
914         pqsignal(SIGCHLD, SIG_DFL);
915         pqsignal(SIGTTIN, SIG_DFL);
916         pqsignal(SIGTTOU, SIG_DFL);
917         pqsignal(SIGCONT, SIG_DFL);
918         pqsignal(SIGWINCH, SIG_DFL);
919 }
920
921 /* Report shared-memory space needed by WalSndShmemInit */
922 Size
923 WalSndShmemSize(void)
924 {
925         Size            size = 0;
926
927         size = offsetof(WalSndCtlData, walsnds);
928         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
929
930         return size;
931 }
932
933 /* Allocate and initialize walsender-related shared memory */
934 void
935 WalSndShmemInit(void)
936 {
937         bool            found;
938         int                     i;
939
940         WalSndCtl = (WalSndCtlData *)
941                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
942
943         if (!found)
944         {
945                 /* First time through, so initialize */
946                 MemSet(WalSndCtl, 0, WalSndShmemSize());
947
948                 for (i = 0; i < max_wal_senders; i++)
949                 {
950                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
951
952                         SpinLockInit(&walsnd->mutex);
953                         InitSharedLatch(&walsnd->latch);
954                 }
955         }
956 }
957
958 /* Wake up all walsenders */
959 void
960 WalSndWakeup(void)
961 {
962         int             i;
963
964         for (i = 0; i < max_wal_senders; i++)
965                 SetLatch(&WalSndCtl->walsnds[i].latch);
966 }
967
968 /* Set state for current walsender (only called in walsender) */
969 void
970 WalSndSetState(WalSndState state)
971 {
972         /* use volatile pointer to prevent code rearrangement */
973         volatile WalSnd *walsnd = MyWalSnd;
974
975         Assert(am_walsender);
976
977         if (walsnd->state == state)
978                 return;
979
980         SpinLockAcquire(&walsnd->mutex);
981         walsnd->state = state;
982         SpinLockRelease(&walsnd->mutex);
983 }
984
985 /*
986  * Return a string constant representing the state. This is used
987  * in system views, and should *not* be translated.
988  */
989 static const char *
990 WalSndGetStateString(WalSndState state)
991 {
992         switch (state)
993         {
994                 case WALSNDSTATE_STARTUP:
995                         return "STARTUP";
996                 case WALSNDSTATE_BACKUP:
997                         return "BACKUP";
998                 case WALSNDSTATE_CATCHUP:
999                         return "CATCHUP";
1000                 case WALSNDSTATE_STREAMING:
1001                         return "STREAMING";
1002         }
1003         return "UNKNOWN";
1004 }
1005
1006
1007 /*
1008  * Returns activity of walsenders, including pids and xlog locations sent to
1009  * standby servers.
1010  */
1011 Datum
1012 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1013 {
1014 #define PG_STAT_GET_WAL_SENDERS_COLS    3
1015         ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1016         TupleDesc                       tupdesc;
1017         Tuplestorestate    *tupstore;
1018         MemoryContext           per_query_ctx;
1019         MemoryContext           oldcontext;
1020         int                                     i;
1021
1022         /* check to see if caller supports us returning a tuplestore */
1023         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1024                 ereport(ERROR,
1025                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1026                                  errmsg("set-valued function called in context that cannot accept a set")));
1027         if (!(rsinfo->allowedModes & SFRM_Materialize))
1028                 ereport(ERROR,
1029                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1030                                  errmsg("materialize mode required, but it is not " \
1031                                                 "allowed in this context")));
1032
1033         /* Build a tuple descriptor for our result type */
1034         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1035                 elog(ERROR, "return type must be a row type");
1036
1037         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1038         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1039
1040         tupstore = tuplestore_begin_heap(true, false, work_mem);
1041         rsinfo->returnMode = SFRM_Materialize;
1042         rsinfo->setResult = tupstore;
1043         rsinfo->setDesc = tupdesc;
1044
1045         MemoryContextSwitchTo(oldcontext);
1046
1047         for (i = 0; i < max_wal_senders; i++)
1048         {
1049                 /* use volatile pointer to prevent code rearrangement */
1050                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1051                 char            sent_location[MAXFNAMELEN];
1052                 XLogRecPtr      sentPtr;
1053                 WalSndState     state;
1054                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1055                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1056
1057                 if (walsnd->pid == 0)
1058                         continue;
1059
1060                 SpinLockAcquire(&walsnd->mutex);
1061                 sentPtr = walsnd->sentPtr;
1062                 state = walsnd->state;
1063                 SpinLockRelease(&walsnd->mutex);
1064
1065                 snprintf(sent_location, sizeof(sent_location), "%X/%X",
1066                                         sentPtr.xlogid, sentPtr.xrecoff);
1067
1068                 memset(nulls, 0, sizeof(nulls));
1069                 values[0] = Int32GetDatum(walsnd->pid);
1070                 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1071                 values[2] = CStringGetTextDatum(sent_location);
1072
1073                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1074         }
1075
1076         /* clean up and return the tuplestore */
1077         tuplestore_donestoring(tupstore);
1078
1079         return (Datum) 0;
1080 }
1081
1082 /*
1083  * This isn't currently used for anything. Monitoring tools might be
1084  * interested in the future, and we'll need something like this in the
1085  * future for synchronous replication.
1086  */
1087 #ifdef NOT_USED
1088 /*
1089  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
1090  * if none.
1091  */
1092 XLogRecPtr
1093 GetOldestWALSendPointer(void)
1094 {
1095         XLogRecPtr      oldest = {0, 0};
1096         int                     i;
1097         bool            found = false;
1098
1099         for (i = 0; i < max_wal_senders; i++)
1100         {
1101                 /* use volatile pointer to prevent code rearrangement */
1102                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1103                 XLogRecPtr      recptr;
1104
1105                 if (walsnd->pid == 0)
1106                         continue;
1107
1108                 SpinLockAcquire(&walsnd->mutex);
1109                 recptr = walsnd->sentPtr;
1110                 SpinLockRelease(&walsnd->mutex);
1111
1112                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
1113                         continue;
1114
1115                 if (!found || XLByteLT(recptr, oldest))
1116                         oldest = recptr;
1117                 found = true;
1118         }
1119         return oldest;
1120 }
1121
1122 #endif