]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Efficient transaction-controlled synchronous replication.
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  * It attempts to keep reading XLOG records from the disk and sending them
11  * to the standby server, as long as the connection is alive (i.e., like
12  * any backend, there is a one-to-one relationship between a connection
13  * and a walsender process).
14  *
15  * Normal termination is by SIGTERM, which instructs the walsender to
16  * close the connection and exit(0) at next convenient moment. Emergency
17  * termination is by SIGQUIT; like any backend, the walsender will simply
18  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
19  * are treated as not a crash but approximately normal termination;
20  * the walsender will exit quickly without sending any more XLOG records.
21  *
22  * If the server is shut down, postmaster sends us SIGUSR2 after all
23  * regular backends have exited and the shutdown checkpoint has been written.
24  * This instruct walsender to send any outstanding WAL, including the
25  * shutdown checkpoint record, and then exit.
26  *
27  *
28  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
29  *
30  * IDENTIFICATION
31  *        src/backend/replication/walsender.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres.h"
36
37 #include <signal.h>
38 #include <unistd.h>
39
40 #include "funcapi.h"
41 #include "access/xlog_internal.h"
42 #include "access/transam.h"
43 #include "catalog/pg_type.h"
44 #include "libpq/libpq.h"
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47 #include "miscadmin.h"
48 #include "replication/basebackup.h"
49 #include "replication/replnodes.h"
50 #include "replication/walprotocol.h"
51 #include "replication/walsender.h"
52 #include "storage/fd.h"
53 #include "storage/ipc.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "storage/procarray.h"
57 #include "tcop/tcopprot.h"
58 #include "utils/builtins.h"
59 #include "utils/guc.h"
60 #include "utils/memutils.h"
61 #include "utils/ps_status.h"
62 #include "utils/resowner.h"
63
64
65 /* Array of WalSnds in shared memory */
66 WalSndCtlData *WalSndCtl = NULL;
67
68 /* My slot in the shared memory array */
69 WalSnd *MyWalSnd = NULL;
70
71 /* Global state */
72 bool            am_walsender = false;           /* Am I a walsender process ? */
73
74 /* User-settable parameters for walsender */
75 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
76 int                     WalSndDelay = 1000;     /* max sleep time between some actions */
77
78 /*
79  * These variables are used similarly to openLogFile/Id/Seg/Off,
80  * but for walsender to read the XLOG.
81  */
82 static int      sendFile = -1;
83 static uint32 sendId = 0;
84 static uint32 sendSeg = 0;
85 static uint32 sendOff = 0;
86
87 /*
88  * How far have we sent WAL already? This is also advertised in
89  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
90  */
91 static XLogRecPtr sentPtr = {0, 0};
92
93 /*
94  * Buffer for processing reply messages.
95  */
96 static StringInfoData reply_message;
97
98 /* Flags set by signal handlers for later service in main loop */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 volatile sig_atomic_t walsender_shutdown_requested = false;
101 volatile sig_atomic_t walsender_ready_to_stop = false;
102
103 /* Signal handlers */
104 static void WalSndSigHupHandler(SIGNAL_ARGS);
105 static void WalSndShutdownHandler(SIGNAL_ARGS);
106 static void WalSndQuickDieHandler(SIGNAL_ARGS);
107 static void WalSndXLogSendHandler(SIGNAL_ARGS);
108 static void WalSndLastCycleHandler(SIGNAL_ARGS);
109
110 /* Prototypes for private functions */
111 static bool HandleReplicationCommand(const char *cmd_string);
112 static int      WalSndLoop(void);
113 static void InitWalSnd(void);
114 static void WalSndHandshake(void);
115 static void WalSndKill(int code, Datum arg);
116 static bool XLogSend(char *msgbuf, bool *caughtup);
117 static void IdentifySystem(void);
118 static void StartReplication(StartReplicationCmd * cmd);
119 static void ProcessStandbyMessage(void);
120 static void ProcessStandbyReplyMessage(void);
121 static void ProcessStandbyHSFeedbackMessage(void);
122 static void ProcessRepliesIfAny(void);
123
124
125 /* Main entry point for walsender process */
126 int
127 WalSenderMain(void)
128 {
129         MemoryContext walsnd_context;
130
131         if (RecoveryInProgress())
132                 ereport(FATAL,
133                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
134                                  errmsg("recovery is still in progress, can't accept WAL streaming connections")));
135
136         /* Create a per-walsender data structure in shared memory */
137         InitWalSnd();
138
139         /*
140          * Create a memory context that we will do all our work in.  We do this so
141          * that we can reset the context during error recovery and thereby avoid
142          * possible memory leaks.  Formerly this code just ran in
143          * TopMemoryContext, but resetting that would be a really bad idea.
144          *
145          * XXX: we don't actually attempt error recovery in walsender, we just
146          * close the connection and exit.
147          */
148         walsnd_context = AllocSetContextCreate(TopMemoryContext,
149                                                                                    "Wal Sender",
150                                                                                    ALLOCSET_DEFAULT_MINSIZE,
151                                                                                    ALLOCSET_DEFAULT_INITSIZE,
152                                                                                    ALLOCSET_DEFAULT_MAXSIZE);
153         MemoryContextSwitchTo(walsnd_context);
154
155         /* Set up resource owner */
156         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
157
158         /* Unblock signals (they were blocked when the postmaster forked us) */
159         PG_SETMASK(&UnBlockSig);
160
161         /* Tell the standby that walsender is ready for receiving commands */
162         ReadyForQuery(DestRemote);
163
164         /* Handle handshake messages before streaming */
165         WalSndHandshake();
166
167         /* Initialize shared memory status */
168         {
169                 /* use volatile pointer to prevent code rearrangement */
170                 volatile WalSnd *walsnd = MyWalSnd;
171
172                 SpinLockAcquire(&walsnd->mutex);
173                 walsnd->sentPtr = sentPtr;
174                 SpinLockRelease(&walsnd->mutex);
175         }
176
177         SyncRepInitConfig();
178
179         /* Main loop of walsender */
180         return WalSndLoop();
181 }
182
183 /*
184  * Execute commands from walreceiver, until we enter streaming mode.
185  */
186 static void
187 WalSndHandshake(void)
188 {
189         StringInfoData input_message;
190         bool            replication_started = false;
191
192         initStringInfo(&input_message);
193
194         while (!replication_started)
195         {
196                 int                     firstchar;
197
198                 WalSndSetState(WALSNDSTATE_STARTUP);
199                 set_ps_display("idle", false);
200
201                 /* Wait for a command to arrive */
202                 firstchar = pq_getbyte();
203
204                 /*
205                  * Emergency bailout if postmaster has died.  This is to avoid the
206                  * necessity for manual cleanup of all postmaster children.
207                  */
208                 if (!PostmasterIsAlive(true))
209                         exit(1);
210
211                 /*
212                  * Check for any other interesting events that happened while we
213                  * slept.
214                  */
215                 if (got_SIGHUP)
216                 {
217                         got_SIGHUP = false;
218                         ProcessConfigFile(PGC_SIGHUP);
219                 }
220
221                 if (firstchar != EOF)
222                 {
223                         /*
224                          * Read the message contents. This is expected to be done without
225                          * blocking because we've been able to get message type code.
226                          */
227                         if (pq_getmessage(&input_message, 0))
228                                 firstchar = EOF;        /* suitable message already logged */
229                 }
230
231                 /* Handle the very limited subset of commands expected in this phase */
232                 switch (firstchar)
233                 {
234                         case 'Q':                       /* Query message */
235                                 {
236                                         const char *query_string;
237
238                                         query_string = pq_getmsgstring(&input_message);
239                                         pq_getmsgend(&input_message);
240
241                                         if (HandleReplicationCommand(query_string))
242                                                 replication_started = true;
243                                 }
244                                 break;
245
246                         case 'X':
247                                 /* standby is closing the connection */
248                                 proc_exit(0);
249
250                         case EOF:
251                                 /* standby disconnected unexpectedly */
252                                 ereport(COMMERROR,
253                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
254                                                  errmsg("unexpected EOF on standby connection")));
255                                 proc_exit(0);
256
257                         default:
258                                 ereport(FATAL,
259                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
260                                                  errmsg("invalid standby handshake message type %d", firstchar)));
261                 }
262         }
263 }
264
265 /*
266  * IDENTIFY_SYSTEM
267  */
268 static void
269 IdentifySystem(void)
270 {
271         StringInfoData buf;
272         char            sysid[32];
273         char            tli[11];
274         char            xpos[MAXFNAMELEN];
275         XLogRecPtr      logptr;
276
277         /*
278          * Reply with a result set with one row, three columns. First col is system
279          * ID, second is timeline ID, and third is current xlog location.
280          */
281
282         snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
283                          GetSystemIdentifier());
284         snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
285
286         logptr = GetInsertRecPtr();
287
288         snprintf(xpos, sizeof(xpos), "%X/%X",
289                          logptr.xlogid, logptr.xrecoff);
290
291         /* Send a RowDescription message */
292         pq_beginmessage(&buf, 'T');
293         pq_sendint(&buf, 3, 2);         /* 3 fields */
294
295         /* first field */
296         pq_sendstring(&buf, "systemid");        /* col name */
297         pq_sendint(&buf, 0, 4);         /* table oid */
298         pq_sendint(&buf, 0, 2);         /* attnum */
299         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
300         pq_sendint(&buf, -1, 2);        /* typlen */
301         pq_sendint(&buf, 0, 4);         /* typmod */
302         pq_sendint(&buf, 0, 2);         /* format code */
303
304         /* second field */
305         pq_sendstring(&buf, "timeline");        /* col name */
306         pq_sendint(&buf, 0, 4);         /* table oid */
307         pq_sendint(&buf, 0, 2);         /* attnum */
308         pq_sendint(&buf, INT4OID, 4);           /* type oid */
309         pq_sendint(&buf, 4, 2);         /* typlen */
310         pq_sendint(&buf, 0, 4);         /* typmod */
311         pq_sendint(&buf, 0, 2);         /* format code */
312
313         /* third field */
314         pq_sendstring(&buf, "xlogpos");
315         pq_sendint(&buf, 0, 4);
316         pq_sendint(&buf, 0, 2);
317         pq_sendint(&buf, TEXTOID, 4);
318         pq_sendint(&buf, -1, 2);
319         pq_sendint(&buf, 0, 4);
320         pq_sendint(&buf, 0, 2);
321         pq_endmessage(&buf);
322
323         /* Send a DataRow message */
324         pq_beginmessage(&buf, 'D');
325         pq_sendint(&buf, 3, 2);         /* # of columns */
326         pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
327         pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
328         pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
329         pq_sendbytes(&buf, (char *) tli, strlen(tli));
330         pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
331         pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
332
333         pq_endmessage(&buf);
334
335         /* Send CommandComplete and ReadyForQuery messages */
336         EndCommand("SELECT", DestRemote);
337         ReadyForQuery(DestRemote);
338         /* ReadyForQuery did pq_flush for us */
339 }
340
341 /*
342  * START_REPLICATION
343  */
344 static void
345 StartReplication(StartReplicationCmd * cmd)
346 {
347         StringInfoData buf;
348
349         /*
350          * Let postmaster know that we're streaming. Once we've declared us as
351          * a WAL sender process, postmaster will let us outlive the bgwriter and
352          * kill us last in the shutdown sequence, so we get a chance to stream
353          * all remaining WAL at shutdown, including the shutdown checkpoint.
354          * Note that there's no going back, and we mustn't write any WAL records
355          * after this.
356          */
357         MarkPostmasterChildWalSender();
358
359         /*
360          * Check that we're logging enough information in the WAL for
361          * log-shipping.
362          *
363          * NOTE: This only checks the current value of wal_level. Even if the
364          * current setting is not 'minimal', there can be old WAL in the pg_xlog
365          * directory that was created with 'minimal'. So this is not bulletproof,
366          * the purpose is just to give a user-friendly error message that hints
367          * how to configure the system correctly.
368          */
369         if (wal_level == WAL_LEVEL_MINIMAL)
370                 ereport(FATAL,
371                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
372                 errmsg("standby connections not allowed because wal_level=minimal")));
373
374         /*
375          * When we first start replication the standby will be behind the primary.
376          * For some applications, for example, synchronous replication, it is
377          * important to have a clear state for this initial catchup mode, so we
378          * can trigger actions when we change streaming state later. We may stay
379          * in this state for a long time, which is exactly why we want to be
380          * able to monitor whether or not we are still here.
381          */
382         WalSndSetState(WALSNDSTATE_CATCHUP);
383
384         /* Send a CopyBothResponse message, and start streaming */
385         pq_beginmessage(&buf, 'W');
386         pq_sendbyte(&buf, 0);
387         pq_sendint(&buf, 0, 2);
388         pq_endmessage(&buf);
389         pq_flush();
390
391         /*
392          * Initialize position to the received one, then the xlog records begin to
393          * be shipped from that position
394          */
395         sentPtr = cmd->startpoint;
396 }
397
398 /*
399  * Execute an incoming replication command.
400  */
401 static bool
402 HandleReplicationCommand(const char *cmd_string)
403 {
404         bool            replication_started = false;
405         int                     parse_rc;
406         Node       *cmd_node;
407         MemoryContext cmd_context;
408         MemoryContext old_context;
409
410         elog(DEBUG1, "received replication command: %s", cmd_string);
411
412         cmd_context = AllocSetContextCreate(CurrentMemoryContext,
413                                                                                 "Replication command context",
414                                                                                 ALLOCSET_DEFAULT_MINSIZE,
415                                                                                 ALLOCSET_DEFAULT_INITSIZE,
416                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
417         old_context = MemoryContextSwitchTo(cmd_context);
418
419         replication_scanner_init(cmd_string);
420         parse_rc = replication_yyparse();
421         if (parse_rc != 0)
422                 ereport(ERROR,
423                                 (errcode(ERRCODE_SYNTAX_ERROR),
424                                  (errmsg_internal("replication command parser returned %d",
425                                                                   parse_rc))));
426
427         cmd_node = replication_parse_result;
428
429         switch (cmd_node->type)
430         {
431                 case T_IdentifySystemCmd:
432                         IdentifySystem();
433                         break;
434
435                 case T_StartReplicationCmd:
436                         StartReplication((StartReplicationCmd *) cmd_node);
437
438                         /* break out of the loop */
439                         replication_started = true;
440                         break;
441
442                 case T_BaseBackupCmd:
443                         SendBaseBackup((BaseBackupCmd *) cmd_node);
444
445                         /* Send CommandComplete and ReadyForQuery messages */
446                         EndCommand("SELECT", DestRemote);
447                         ReadyForQuery(DestRemote);
448                         /* ReadyForQuery did pq_flush for us */
449                         break;
450
451                 default:
452                         ereport(FATAL,
453                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
454                                          errmsg("invalid standby query string: %s", cmd_string)));
455         }
456
457         /* done */
458         MemoryContextSwitchTo(old_context);
459         MemoryContextDelete(cmd_context);
460
461         return replication_started;
462 }
463
464 /*
465  * Check if the remote end has closed the connection.
466  */
467 static void
468 ProcessRepliesIfAny(void)
469 {
470         unsigned char firstchar;
471         int                     r;
472
473         for (;;)
474         {
475                 r = pq_getbyte_if_available(&firstchar);
476                 if (r < 0)
477                 {
478                         /* unexpected error or EOF */
479                         ereport(COMMERROR,
480                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
481                                          errmsg("unexpected EOF on standby connection")));
482                         proc_exit(0);
483                 }
484                 if (r == 0)
485                 {
486                         /* no data available without blocking */
487                         return;
488                 }
489
490                 /* Handle the very limited subset of commands expected in this phase */
491                 switch (firstchar)
492                 {
493                                 /*
494                                  * 'd' means a standby reply wrapped in a CopyData packet.
495                                  */
496                         case 'd':
497                                 ProcessStandbyMessage();
498                                 break;
499
500                                 /*
501                                  * 'X' means that the standby is closing down the socket.
502                                  */
503                         case 'X':
504                                 proc_exit(0);
505
506                         default:
507                                 ereport(FATAL,
508                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
509                                                  errmsg("invalid standby closing message type %d",
510                                                                 firstchar)));
511                 }
512         }
513 }
514
515 /*
516  * Process a status update message received from standby.
517  */
518 static void
519 ProcessStandbyMessage(void)
520 {
521         char msgtype;
522
523         resetStringInfo(&reply_message);
524
525         /*
526          * Read the message contents.
527          */
528         if (pq_getmessage(&reply_message, 0))
529         {
530                 ereport(COMMERROR,
531                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
532                                  errmsg("unexpected EOF on standby connection")));
533                 proc_exit(0);
534         }
535
536         /*
537          * Check message type from the first byte. At the moment, there is only
538          * one type.
539          */
540         msgtype = pq_getmsgbyte(&reply_message);
541
542         switch (msgtype)
543         {
544                 case 'r':
545                         ProcessStandbyReplyMessage();
546                         break;
547
548                 case 'h':
549                         ProcessStandbyHSFeedbackMessage();
550                         break;
551
552                 default:
553                         ereport(COMMERROR,
554                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
555                                          errmsg("unexpected message type %c", msgtype)));
556                         proc_exit(0);
557         }
558 }
559
560 /*
561  * Regular reply from standby advising of WAL positions on standby server.
562  */
563 static void
564 ProcessStandbyReplyMessage(void)
565 {
566         StandbyReplyMessage     reply;
567
568         pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
569
570         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
571                  reply.write.xlogid, reply.write.xrecoff,
572                  reply.flush.xlogid, reply.flush.xrecoff,
573                  reply.apply.xlogid, reply.apply.xrecoff);
574
575         /*
576          * Update shared state for this WalSender process
577          * based on reply data from standby.
578          */
579         {
580                 /* use volatile pointer to prevent code rearrangement */
581                 volatile WalSnd *walsnd = MyWalSnd;
582
583                 SpinLockAcquire(&walsnd->mutex);
584                 walsnd->write = reply.write;
585                 walsnd->flush = reply.flush;
586                 walsnd->apply = reply.apply;
587                 SpinLockRelease(&walsnd->mutex);
588         }
589
590         SyncRepReleaseWaiters();
591 }
592
593 /*
594  * Hot Standby feedback
595  */
596 static void
597 ProcessStandbyHSFeedbackMessage(void)
598 {
599         StandbyHSFeedbackMessage        reply;
600         TransactionId newxmin = InvalidTransactionId;
601
602         pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
603
604         elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
605                  reply.xmin,
606                  reply.epoch);
607
608         /*
609          * Update the WalSender's proc xmin to allow it to be visible
610          * to snapshots. This will hold back the removal of dead rows
611          * and thereby prevent the generation of cleanup conflicts
612          * on the standby server.
613          */
614         if (TransactionIdIsValid(reply.xmin))
615         {
616                 TransactionId   nextXid;
617                 uint32                  nextEpoch;
618                 bool                    epochOK = false;
619
620                 GetNextXidAndEpoch(&nextXid, &nextEpoch);
621
622                 /*
623                  * Epoch of oldestXmin should be same as standby or
624                  * if the counter has wrapped, then one less than reply.
625                  */
626                 if (reply.xmin <= nextXid)
627                 {
628                         if (reply.epoch == nextEpoch)
629                                 epochOK = true;
630                 }
631                 else
632                 {
633                         if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
634                                 epochOK = true;
635                 }
636
637                 /*
638                  * Feedback from standby must not go backwards, nor should it go
639                  * forwards further than our most recent xid.
640                  */
641                 if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
642                 {
643                         if (!TransactionIdIsValid(MyProc->xmin))
644                         {
645                                 TransactionId oldestXmin = GetOldestXmin(true, true);
646                                 if (TransactionIdPrecedes(oldestXmin, reply.xmin))
647                                         newxmin = reply.xmin;
648                                 else
649                                         newxmin = oldestXmin;
650                         }
651                         else
652                         {
653                                 if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
654                                         newxmin = reply.xmin;
655                                 else
656                                         newxmin = MyProc->xmin; /* stay the same */
657                         }
658                 }
659         }
660
661         /*
662          * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
663          */
664         if (MyProc->xmin != newxmin)
665         {
666                 LWLockAcquire(ProcArrayLock, LW_SHARED);
667                 MyProc->xmin = newxmin;
668                 LWLockRelease(ProcArrayLock);
669         }
670 }
671
672 /* Main loop of walsender process */
673 static int
674 WalSndLoop(void)
675 {
676         char       *output_message;
677         bool            caughtup = false;
678
679         /*
680          * Allocate buffer that will be used for each output message.  We do this
681          * just once to reduce palloc overhead.  The buffer must be made large
682          * enough for maximum-sized messages.
683          */
684         output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
685
686         /*
687          * Allocate buffer that will be used for processing reply messages.  As
688          * above, do this just once to reduce palloc overhead.
689          */
690         initStringInfo(&reply_message);
691
692         /* Loop forever, unless we get an error */
693         for (;;)
694         {
695                 /*
696                  * Emergency bailout if postmaster has died.  This is to avoid the
697                  * necessity for manual cleanup of all postmaster children.
698                  */
699                 if (!PostmasterIsAlive(true))
700                         exit(1);
701
702                 /* Process any requests or signals received recently */
703                 if (got_SIGHUP)
704                 {
705                         got_SIGHUP = false;
706                         ProcessConfigFile(PGC_SIGHUP);
707                         SyncRepInitConfig();
708                 }
709
710                 /*
711                  * When SIGUSR2 arrives, we send all outstanding logs up to the
712                  * shutdown checkpoint record (i.e., the latest record) and exit.
713                  */
714                 if (walsender_ready_to_stop)
715                 {
716                         if (!XLogSend(output_message, &caughtup))
717                                 break;
718                         ProcessRepliesIfAny();
719                         if (caughtup)
720                                 walsender_shutdown_requested = true;
721                 }
722
723                 /* Normal exit from the walsender is here */
724                 if (walsender_shutdown_requested)
725                 {
726                         /* Inform the standby that XLOG streaming was done */
727                         pq_puttextmessage('C', "COPY 0");
728                         pq_flush();
729
730                         proc_exit(0);
731                 }
732
733                 /*
734                  * If we had sent all accumulated WAL in last round, nap for the
735                  * configured time before retrying.
736                  */
737                 if (caughtup)
738                 {
739                         /*
740                          * Even if we wrote all the WAL that was available when we started
741                          * sending, more might have arrived while we were sending this
742                          * batch. We had the latch set while sending, so we have not
743                          * received any signals from that time. Let's arm the latch
744                          * again, and after that check that we're still up-to-date.
745                          */
746                         ResetLatch(&MyWalSnd->latch);
747
748                         if (!XLogSend(output_message, &caughtup))
749                                 break;
750                         if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
751                         {
752                                 /*
753                                  * XXX: We don't really need the periodic wakeups anymore,
754                                  * WaitLatchOrSocket should reliably wake up as soon as
755                                  * something interesting happens.
756                                  */
757
758                                 /* Sleep */
759                                 WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
760                                                                   WalSndDelay * 1000L);
761                         }
762                 }
763                 else
764                 {
765                         /* Attempt to send the log once every loop */
766                         if (!XLogSend(output_message, &caughtup))
767                                 break;
768                 }
769
770                 /*
771                  * If we're in catchup state, see if its time to move to streaming.
772                  * This is an important state change for users, since before this
773                  * point data loss might occur if the primary dies and we need to
774                  * failover to the standby. The state change is also important for
775                  * synchronous replication, since commits that started to wait at
776                  * that point might wait for some time.
777                  */
778                 if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
779                 {
780                         ereport(DEBUG1,
781                                         (errmsg("standby \"%s\" has now caught up with primary",
782                                                                         application_name)));
783                         WalSndSetState(WALSNDSTATE_STREAMING);
784                 }
785
786                 ProcessRepliesIfAny();
787         }
788
789         /*
790          * Get here on send failure.  Clean up and exit.
791          *
792          * Reset whereToSendOutput to prevent ereport from attempting to send any
793          * more messages to the standby.
794          */
795         if (whereToSendOutput == DestRemote)
796                 whereToSendOutput = DestNone;
797
798         proc_exit(0);
799         return 1;                                       /* keep the compiler quiet */
800 }
801
802 /* Initialize a per-walsender data structure for this walsender process */
803 static void
804 InitWalSnd(void)
805 {
806         int                     i;
807
808         /*
809          * WalSndCtl should be set up already (we inherit this by fork() or
810          * EXEC_BACKEND mechanism from the postmaster).
811          */
812         Assert(WalSndCtl != NULL);
813         Assert(MyWalSnd == NULL);
814
815         /*
816          * Find a free walsender slot and reserve it. If this fails, we must be
817          * out of WalSnd structures.
818          */
819         for (i = 0; i < max_wal_senders; i++)
820         {
821                 /* use volatile pointer to prevent code rearrangement */
822                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
823
824                 SpinLockAcquire(&walsnd->mutex);
825
826                 if (walsnd->pid != 0)
827                 {
828                         SpinLockRelease(&walsnd->mutex);
829                         continue;
830                 }
831                 else
832                 {
833                         /*
834                          * Found a free slot. Reserve it for us.
835                          */
836                         walsnd->pid = MyProcPid;
837                         MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
838                         walsnd->state = WALSNDSTATE_STARTUP;
839                         SpinLockRelease(&walsnd->mutex);
840                         /* don't need the lock anymore */
841                         OwnLatch((Latch *) &walsnd->latch);
842                         MyWalSnd = (WalSnd *) walsnd;
843
844                         break;
845                 }
846         }
847         if (MyWalSnd == NULL)
848                 ereport(FATAL,
849                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
850                                  errmsg("number of requested standby connections "
851                                                 "exceeds max_wal_senders (currently %d)",
852                                                 max_wal_senders)));
853
854         /* Arrange to clean up at walsender exit */
855         on_shmem_exit(WalSndKill, 0);
856 }
857
858 /* Destroy the per-walsender data structure for this walsender process */
859 static void
860 WalSndKill(int code, Datum arg)
861 {
862         Assert(MyWalSnd != NULL);
863
864         /*
865          * Mark WalSnd struct no longer in use. Assume that no lock is required
866          * for this.
867          */
868         MyWalSnd->pid = 0;
869         DisownLatch(&MyWalSnd->latch);
870
871         /* WalSnd struct isn't mine anymore */
872         MyWalSnd = NULL;
873 }
874
875 /*
876  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
877  *
878  * XXX probably this should be improved to suck data directly from the
879  * WAL buffers when possible.
880  *
881  * Will open, and keep open, one WAL segment stored in the global file
882  * descriptor sendFile. This means if XLogRead is used once, there will
883  * always be one descriptor left open until the process ends, but never
884  * more than one.
885  */
886 void
887 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
888 {
889         XLogRecPtr      startRecPtr = recptr;
890         char            path[MAXPGPATH];
891         uint32          lastRemovedLog;
892         uint32          lastRemovedSeg;
893         uint32          log;
894         uint32          seg;
895
896         while (nbytes > 0)
897         {
898                 uint32          startoff;
899                 int                     segbytes;
900                 int                     readbytes;
901
902                 startoff = recptr.xrecoff % XLogSegSize;
903
904                 if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
905                 {
906                         /* Switch to another logfile segment */
907                         if (sendFile >= 0)
908                                 close(sendFile);
909
910                         XLByteToSeg(recptr, sendId, sendSeg);
911                         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
912
913                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
914                         if (sendFile < 0)
915                         {
916                                 /*
917                                  * If the file is not found, assume it's because the standby
918                                  * asked for a too old WAL segment that has already been
919                                  * removed or recycled.
920                                  */
921                                 if (errno == ENOENT)
922                                 {
923                                         char            filename[MAXFNAMELEN];
924
925                                         XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
926                                         ereport(ERROR,
927                                                         (errcode_for_file_access(),
928                                                          errmsg("requested WAL segment %s has already been removed",
929                                                                         filename)));
930                                 }
931                                 else
932                                         ereport(ERROR,
933                                                         (errcode_for_file_access(),
934                                                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
935                                                                         path, sendId, sendSeg)));
936                         }
937                         sendOff = 0;
938                 }
939
940                 /* Need to seek in the file? */
941                 if (sendOff != startoff)
942                 {
943                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
944                                 ereport(ERROR,
945                                                 (errcode_for_file_access(),
946                                                  errmsg("could not seek in log file %u, segment %u to offset %u: %m",
947                                                                 sendId, sendSeg, startoff)));
948                         sendOff = startoff;
949                 }
950
951                 /* How many bytes are within this segment? */
952                 if (nbytes > (XLogSegSize - startoff))
953                         segbytes = XLogSegSize - startoff;
954                 else
955                         segbytes = nbytes;
956
957                 readbytes = read(sendFile, buf, segbytes);
958                 if (readbytes <= 0)
959                         ereport(ERROR,
960                                         (errcode_for_file_access(),
961                         errmsg("could not read from log file %u, segment %u, offset %u, "
962                                    "length %lu: %m",
963                                    sendId, sendSeg, sendOff, (unsigned long) segbytes)));
964
965                 /* Update state for read */
966                 XLByteAdvance(recptr, readbytes);
967
968                 sendOff += readbytes;
969                 nbytes -= readbytes;
970                 buf += readbytes;
971         }
972
973         /*
974          * After reading into the buffer, check that what we read was valid. We do
975          * this after reading, because even though the segment was present when we
976          * opened it, it might get recycled or removed while we read it. The
977          * read() succeeds in that case, but the data we tried to read might
978          * already have been overwritten with new WAL records.
979          */
980         XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
981         XLByteToSeg(startRecPtr, log, seg);
982         if (log < lastRemovedLog ||
983                 (log == lastRemovedLog && seg <= lastRemovedSeg))
984         {
985                 char            filename[MAXFNAMELEN];
986
987                 XLogFileName(filename, ThisTimeLineID, log, seg);
988                 ereport(ERROR,
989                                 (errcode_for_file_access(),
990                                  errmsg("requested WAL segment %s has already been removed",
991                                                 filename)));
992         }
993 }
994
995 /*
996  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
997  * but not yet sent to the client, and send it.
998  *
999  * msgbuf is a work area in which the output message is constructed.  It's
1000  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
1001  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
1002  *
1003  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
1004  * *caughtup is set to false.
1005  *
1006  * Returns true if OK, false if trouble.
1007  */
1008 static bool
1009 XLogSend(char *msgbuf, bool *caughtup)
1010 {
1011         XLogRecPtr      SendRqstPtr;
1012         XLogRecPtr      startptr;
1013         XLogRecPtr      endptr;
1014         Size            nbytes;
1015         WalDataMessageHeader msghdr;
1016
1017         /*
1018          * Attempt to send all data that's already been written out and fsync'd to
1019          * disk.  We cannot go further than what's been written out given the
1020          * current implementation of XLogRead().  And in any case it's unsafe to
1021          * send WAL that is not securely down to disk on the master: if the master
1022          * subsequently crashes and restarts, slaves must not have applied any WAL
1023          * that gets lost on the master.
1024          */
1025         SendRqstPtr = GetFlushRecPtr();
1026
1027         /* Quick exit if nothing to do */
1028         if (XLByteLE(SendRqstPtr, sentPtr))
1029         {
1030                 *caughtup = true;
1031                 return true;
1032         }
1033
1034         /*
1035          * Figure out how much to send in one message. If there's no more than
1036          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
1037          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
1038          *
1039          * The rounding is not only for performance reasons. Walreceiver relies on
1040          * the fact that we never split a WAL record across two messages. Since a
1041          * long WAL record is split at page boundary into continuation records,
1042          * page boundary is always a safe cut-off point. We also assume that
1043          * SendRqstPtr never points to the middle of a WAL record.
1044          */
1045         startptr = sentPtr;
1046         if (startptr.xrecoff >= XLogFileSize)
1047         {
1048                 /*
1049                  * crossing a logid boundary, skip the non-existent last log segment
1050                  * in previous logical log file.
1051                  */
1052                 startptr.xlogid += 1;
1053                 startptr.xrecoff = 0;
1054         }
1055
1056         endptr = startptr;
1057         XLByteAdvance(endptr, MAX_SEND_SIZE);
1058         if (endptr.xlogid != startptr.xlogid)
1059         {
1060                 /* Don't cross a logfile boundary within one message */
1061                 Assert(endptr.xlogid == startptr.xlogid + 1);
1062                 endptr.xlogid = startptr.xlogid;
1063                 endptr.xrecoff = XLogFileSize;
1064         }
1065
1066         /* if we went beyond SendRqstPtr, back off */
1067         if (XLByteLE(SendRqstPtr, endptr))
1068         {
1069                 endptr = SendRqstPtr;
1070                 *caughtup = true;
1071         }
1072         else
1073         {
1074                 /* round down to page boundary. */
1075                 endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
1076                 *caughtup = false;
1077         }
1078
1079         nbytes = endptr.xrecoff - startptr.xrecoff;
1080         Assert(nbytes <= MAX_SEND_SIZE);
1081
1082         /*
1083          * OK to read and send the slice.
1084          */
1085         msgbuf[0] = 'w';
1086
1087         /*
1088          * Read the log directly into the output buffer to avoid extra memcpy
1089          * calls.
1090          */
1091         XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
1092
1093         /*
1094          * We fill the message header last so that the send timestamp is taken as
1095          * late as possible.
1096          */
1097         msghdr.dataStart = startptr;
1098         msghdr.walEnd = SendRqstPtr;
1099         msghdr.sendTime = GetCurrentTimestamp();
1100
1101         memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
1102
1103         pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
1104
1105         /* Flush pending output to the client */
1106         if (pq_flush())
1107                 return false;
1108
1109         sentPtr = endptr;
1110
1111         /* Update shared memory status */
1112         {
1113                 /* use volatile pointer to prevent code rearrangement */
1114                 volatile WalSnd *walsnd = MyWalSnd;
1115
1116                 SpinLockAcquire(&walsnd->mutex);
1117                 walsnd->sentPtr = sentPtr;
1118                 SpinLockRelease(&walsnd->mutex);
1119         }
1120
1121         /* Report progress of XLOG streaming in PS display */
1122         if (update_process_title)
1123         {
1124                 char            activitymsg[50];
1125
1126                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1127                                  sentPtr.xlogid, sentPtr.xrecoff);
1128                 set_ps_display(activitymsg, false);
1129         }
1130
1131         return true;
1132 }
1133
1134 /* SIGHUP: set flag to re-read config file at next convenient time */
1135 static void
1136 WalSndSigHupHandler(SIGNAL_ARGS)
1137 {
1138         got_SIGHUP = true;
1139         if (MyWalSnd)
1140                 SetLatch(&MyWalSnd->latch);
1141 }
1142
1143 /* SIGTERM: set flag to shut down */
1144 static void
1145 WalSndShutdownHandler(SIGNAL_ARGS)
1146 {
1147         walsender_shutdown_requested = true;
1148         if (MyWalSnd)
1149                 SetLatch(&MyWalSnd->latch);
1150 }
1151
1152 /*
1153  * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
1154  *
1155  * Some backend has bought the farm,
1156  * so we need to stop what we're doing and exit.
1157  */
1158 static void
1159 WalSndQuickDieHandler(SIGNAL_ARGS)
1160 {
1161         PG_SETMASK(&BlockSig);
1162
1163         /*
1164          * We DO NOT want to run proc_exit() callbacks -- we're here because
1165          * shared memory may be corrupted, so we don't want to try to clean up our
1166          * transaction.  Just nail the windows shut and get out of town.  Now that
1167          * there's an atexit callback to prevent third-party code from breaking
1168          * things by calling exit() directly, we have to reset the callbacks
1169          * explicitly to make this work as intended.
1170          */
1171         on_exit_reset();
1172
1173         /*
1174          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
1175          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
1176          * backend.  This is necessary precisely because we don't clean up our
1177          * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
1178          * should ensure the postmaster sees this as a crash, too, but no harm in
1179          * being doubly sure.)
1180          */
1181         exit(2);
1182 }
1183
1184 /* SIGUSR1: set flag to send WAL records */
1185 static void
1186 WalSndXLogSendHandler(SIGNAL_ARGS)
1187 {
1188         latch_sigusr1_handler();
1189 }
1190
1191 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1192 static void
1193 WalSndLastCycleHandler(SIGNAL_ARGS)
1194 {
1195         walsender_ready_to_stop = true;
1196         if (MyWalSnd)
1197                 SetLatch(&MyWalSnd->latch);
1198 }
1199
1200 /* Set up signal handlers */
1201 void
1202 WalSndSignals(void)
1203 {
1204         /* Set up signal handlers */
1205         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
1206                                                                                                  * file */
1207         pqsignal(SIGINT, SIG_IGN);      /* not used */
1208         pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
1209         pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
1210         pqsignal(SIGALRM, SIG_IGN);
1211         pqsignal(SIGPIPE, SIG_IGN);
1212         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
1213         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
1214                                                                                                  * shutdown */
1215
1216         /* Reset some signals that are accepted by postmaster but not here */
1217         pqsignal(SIGCHLD, SIG_DFL);
1218         pqsignal(SIGTTIN, SIG_DFL);
1219         pqsignal(SIGTTOU, SIG_DFL);
1220         pqsignal(SIGCONT, SIG_DFL);
1221         pqsignal(SIGWINCH, SIG_DFL);
1222 }
1223
1224 /* Report shared-memory space needed by WalSndShmemInit */
1225 Size
1226 WalSndShmemSize(void)
1227 {
1228         Size            size = 0;
1229
1230         size = offsetof(WalSndCtlData, walsnds);
1231         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1232
1233         return size;
1234 }
1235
1236 /* Allocate and initialize walsender-related shared memory */
1237 void
1238 WalSndShmemInit(void)
1239 {
1240         bool            found;
1241         int                     i;
1242
1243         WalSndCtl = (WalSndCtlData *)
1244                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1245
1246         if (!found)
1247         {
1248                 /* First time through, so initialize */
1249                 MemSet(WalSndCtl, 0, WalSndShmemSize());
1250
1251                 SHMQueueInit(&(WalSndCtl->SyncRepQueue));
1252
1253                 for (i = 0; i < max_wal_senders; i++)
1254                 {
1255                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
1256
1257                         SpinLockInit(&walsnd->mutex);
1258                         InitSharedLatch(&walsnd->latch);
1259                 }
1260         }
1261 }
1262
1263 /* Wake up all walsenders */
1264 void
1265 WalSndWakeup(void)
1266 {
1267         int             i;
1268
1269         for (i = 0; i < max_wal_senders; i++)
1270                 SetLatch(&WalSndCtl->walsnds[i].latch);
1271 }
1272
1273 /* Set state for current walsender (only called in walsender) */
1274 void
1275 WalSndSetState(WalSndState state)
1276 {
1277         /* use volatile pointer to prevent code rearrangement */
1278         volatile WalSnd *walsnd = MyWalSnd;
1279
1280         Assert(am_walsender);
1281
1282         if (walsnd->state == state)
1283                 return;
1284
1285         SpinLockAcquire(&walsnd->mutex);
1286         walsnd->state = state;
1287         SpinLockRelease(&walsnd->mutex);
1288 }
1289
1290 /*
1291  * Return a string constant representing the state. This is used
1292  * in system views, and should *not* be translated.
1293  */
1294 static const char *
1295 WalSndGetStateString(WalSndState state)
1296 {
1297         switch (state)
1298         {
1299                 case WALSNDSTATE_STARTUP:
1300                         return "STARTUP";
1301                 case WALSNDSTATE_BACKUP:
1302                         return "BACKUP";
1303                 case WALSNDSTATE_CATCHUP:
1304                         return "CATCHUP";
1305                 case WALSNDSTATE_STREAMING:
1306                         return "STREAMING";
1307         }
1308         return "UNKNOWN";
1309 }
1310
1311
1312 /*
1313  * Returns activity of walsenders, including pids and xlog locations sent to
1314  * standby servers.
1315  */
1316 Datum
1317 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1318 {
1319 #define PG_STAT_GET_WAL_SENDERS_COLS    8
1320         ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1321         TupleDesc                       tupdesc;
1322         Tuplestorestate    *tupstore;
1323         MemoryContext           per_query_ctx;
1324         MemoryContext           oldcontext;
1325         int                                     sync_priority[max_wal_senders];
1326         int                                     priority = 0;
1327         int                                     sync_standby = -1;
1328         int                                     i;
1329
1330         /* check to see if caller supports us returning a tuplestore */
1331         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1332                 ereport(ERROR,
1333                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1334                                  errmsg("set-valued function called in context that cannot accept a set")));
1335         if (!(rsinfo->allowedModes & SFRM_Materialize))
1336                 ereport(ERROR,
1337                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338                                  errmsg("materialize mode required, but it is not " \
1339                                                 "allowed in this context")));
1340
1341         /* Build a tuple descriptor for our result type */
1342         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1343                 elog(ERROR, "return type must be a row type");
1344
1345         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1346         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1347
1348         tupstore = tuplestore_begin_heap(true, false, work_mem);
1349         rsinfo->returnMode = SFRM_Materialize;
1350         rsinfo->setResult = tupstore;
1351         rsinfo->setDesc = tupdesc;
1352
1353         MemoryContextSwitchTo(oldcontext);
1354
1355         /*
1356          * Get the priorities of sync standbys all in one go, to minimise
1357          * lock acquisitions and to allow us to evaluate who is the current
1358          * sync standby. This code must match the code in SyncRepReleaseWaiters().
1359          */
1360         LWLockAcquire(SyncRepLock, LW_SHARED);
1361         for (i = 0; i < max_wal_senders; i++)
1362         {
1363                 /* use volatile pointer to prevent code rearrangement */
1364                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1365
1366                 if (walsnd->pid != 0)
1367                 {
1368                         sync_priority[i] = walsnd->sync_standby_priority;
1369
1370                         if (walsnd->state == WALSNDSTATE_STREAMING &&
1371                                 walsnd->sync_standby_priority > 0 &&
1372                                 (priority == 0 ||
1373                                  priority > walsnd->sync_standby_priority))
1374                         {
1375                                 priority = walsnd->sync_standby_priority;
1376                                 sync_standby = i;
1377                         }
1378                 }
1379         }
1380         LWLockRelease(SyncRepLock);
1381
1382         for (i = 0; i < max_wal_senders; i++)
1383         {
1384                 /* use volatile pointer to prevent code rearrangement */
1385                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1386                 char            location[MAXFNAMELEN];
1387                 XLogRecPtr      sentPtr;
1388                 XLogRecPtr      write;
1389                 XLogRecPtr      flush;
1390                 XLogRecPtr      apply;
1391                 WalSndState     state;
1392                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1393                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1394
1395                 if (walsnd->pid == 0)
1396                         continue;
1397
1398                 SpinLockAcquire(&walsnd->mutex);
1399                 sentPtr = walsnd->sentPtr;
1400                 state = walsnd->state;
1401                 write = walsnd->write;
1402                 flush = walsnd->flush;
1403                 apply = walsnd->apply;
1404                 SpinLockRelease(&walsnd->mutex);
1405
1406                 memset(nulls, 0, sizeof(nulls));
1407                 values[0] = Int32GetDatum(walsnd->pid);
1408
1409                 if (!superuser())
1410                 {
1411                         /*
1412                          * Only superusers can see details. Other users only get
1413                          * the pid value to know it's a walsender, but no details.
1414                          */
1415                         MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
1416                 }
1417                 else
1418                 {
1419                         values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1420
1421                         snprintf(location, sizeof(location), "%X/%X",
1422                                          sentPtr.xlogid, sentPtr.xrecoff);
1423                         values[2] = CStringGetTextDatum(location);
1424
1425                         if (write.xlogid == 0 && write.xrecoff == 0)
1426                                 nulls[3] = true;
1427                         snprintf(location, sizeof(location), "%X/%X",
1428                                          write.xlogid, write.xrecoff);
1429                         values[3] = CStringGetTextDatum(location);
1430
1431                         if (flush.xlogid == 0 && flush.xrecoff == 0)
1432                                 nulls[4] = true;
1433                         snprintf(location, sizeof(location), "%X/%X",
1434                                         flush.xlogid, flush.xrecoff);
1435                         values[4] = CStringGetTextDatum(location);
1436
1437                         if (apply.xlogid == 0 && apply.xrecoff == 0)
1438                                 nulls[5] = true;
1439                         snprintf(location, sizeof(location), "%X/%X",
1440                                          apply.xlogid, apply.xrecoff);
1441                         values[5] = CStringGetTextDatum(location);
1442
1443                         values[6] = Int32GetDatum(sync_priority[i]);
1444
1445                         /*
1446                          * More easily understood version of standby state.
1447                          * This is purely informational, not different from priority.
1448                          */
1449                         if (sync_priority[i] == 0)
1450                                 values[7] = CStringGetTextDatum("ASYNC");
1451                         else if (i == sync_standby)
1452                                 values[7] = CStringGetTextDatum("SYNC");
1453                         else
1454                                 values[7] = CStringGetTextDatum("POTENTIAL");
1455                 }
1456
1457                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1458         }
1459
1460         /* clean up and return the tuplestore */
1461         tuplestore_donestoring(tupstore);
1462
1463         return (Datum) 0;
1464 }
1465
1466 /*
1467  * This isn't currently used for anything. Monitoring tools might be
1468  * interested in the future, and we'll need something like this in the
1469  * future for synchronous replication.
1470  */
1471 #ifdef NOT_USED
1472 /*
1473  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
1474  * if none.
1475  */
1476 XLogRecPtr
1477 GetOldestWALSendPointer(void)
1478 {
1479         XLogRecPtr      oldest = {0, 0};
1480         int                     i;
1481         bool            found = false;
1482
1483         for (i = 0; i < max_wal_senders; i++)
1484         {
1485                 /* use volatile pointer to prevent code rearrangement */
1486                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1487                 XLogRecPtr      recptr;
1488
1489                 if (walsnd->pid == 0)
1490                         continue;
1491
1492                 SpinLockAcquire(&walsnd->mutex);
1493                 recptr = walsnd->sentPtr;
1494                 SpinLockRelease(&walsnd->mutex);
1495
1496                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
1497                         continue;
1498
1499                 if (!found || XLByteLT(recptr, oldest))
1500                         oldest = recptr;
1501                 found = true;
1502         }
1503         return oldest;
1504 }
1505
1506 #endif