]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Fix blatantly uninitialized variable in recent commit.
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  * It attempts to keep reading XLOG records from the disk and sending them
11  * to the standby server, as long as the connection is alive (i.e., like
12  * any backend, there is a one-to-one relationship between a connection
13  * and a walsender process).
14  *
15  * Normal termination is by SIGTERM, which instructs the walsender to
16  * close the connection and exit(0) at next convenient moment. Emergency
17  * termination is by SIGQUIT; like any backend, the walsender will simply
18  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
19  * are treated as not a crash but approximately normal termination;
20  * the walsender will exit quickly without sending any more XLOG records.
21  *
22  * If the server is shut down, postmaster sends us SIGUSR2 after all
23  * regular backends have exited and the shutdown checkpoint has been written.
24  * This instruct walsender to send any outstanding WAL, including the
25  * shutdown checkpoint record, and then exit.
26  *
27  *
28  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
29  *
30  * IDENTIFICATION
31  *        src/backend/replication/walsender.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres.h"
36
37 #include <signal.h>
38 #include <unistd.h>
39
40 #include "funcapi.h"
41 #include "access/xlog_internal.h"
42 #include "access/transam.h"
43 #include "catalog/pg_type.h"
44 #include "libpq/libpq.h"
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47 #include "miscadmin.h"
48 #include "replication/basebackup.h"
49 #include "replication/replnodes.h"
50 #include "replication/walprotocol.h"
51 #include "replication/walsender.h"
52 #include "storage/fd.h"
53 #include "storage/ipc.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "storage/procarray.h"
57 #include "tcop/tcopprot.h"
58 #include "utils/builtins.h"
59 #include "utils/guc.h"
60 #include "utils/memutils.h"
61 #include "utils/ps_status.h"
62 #include "utils/resowner.h"
63
64
65 /* Array of WalSnds in shared memory */
66 WalSndCtlData *WalSndCtl = NULL;
67
68 /* My slot in the shared memory array */
69 static WalSnd *MyWalSnd = NULL;
70
71 /* Global state */
72 bool            am_walsender = false;           /* Am I a walsender process ? */
73
74 /* User-settable parameters for walsender */
75 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
76 int                     WalSndDelay = 200;      /* max sleep time between some actions */
77
78 /*
79  * These variables are used similarly to openLogFile/Id/Seg/Off,
80  * but for walsender to read the XLOG.
81  */
82 static int      sendFile = -1;
83 static uint32 sendId = 0;
84 static uint32 sendSeg = 0;
85 static uint32 sendOff = 0;
86
87 /*
88  * How far have we sent WAL already? This is also advertised in
89  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
90  */
91 static XLogRecPtr sentPtr = {0, 0};
92
93 /*
94  * Buffer for processing reply messages.
95  */
96 static StringInfoData reply_message;
97
98 /* Flags set by signal handlers for later service in main loop */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 volatile sig_atomic_t walsender_shutdown_requested = false;
101 volatile sig_atomic_t walsender_ready_to_stop = false;
102
103 /* Signal handlers */
104 static void WalSndSigHupHandler(SIGNAL_ARGS);
105 static void WalSndShutdownHandler(SIGNAL_ARGS);
106 static void WalSndQuickDieHandler(SIGNAL_ARGS);
107 static void WalSndXLogSendHandler(SIGNAL_ARGS);
108 static void WalSndLastCycleHandler(SIGNAL_ARGS);
109
110 /* Prototypes for private functions */
111 static bool HandleReplicationCommand(const char *cmd_string);
112 static int      WalSndLoop(void);
113 static void InitWalSnd(void);
114 static void WalSndHandshake(void);
115 static void WalSndKill(int code, Datum arg);
116 static bool XLogSend(char *msgbuf, bool *caughtup);
117 static void IdentifySystem(void);
118 static void StartReplication(StartReplicationCmd * cmd);
119 static void ProcessStandbyReplyMessage(void);
120 static void ProcessRepliesIfAny(void);
121
122
123 /* Main entry point for walsender process */
124 int
125 WalSenderMain(void)
126 {
127         MemoryContext walsnd_context;
128
129         if (RecoveryInProgress())
130                 ereport(FATAL,
131                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
132                                  errmsg("recovery is still in progress, can't accept WAL streaming connections")));
133
134         /* Create a per-walsender data structure in shared memory */
135         InitWalSnd();
136
137         /*
138          * Create a memory context that we will do all our work in.  We do this so
139          * that we can reset the context during error recovery and thereby avoid
140          * possible memory leaks.  Formerly this code just ran in
141          * TopMemoryContext, but resetting that would be a really bad idea.
142          *
143          * XXX: we don't actually attempt error recovery in walsender, we just
144          * close the connection and exit.
145          */
146         walsnd_context = AllocSetContextCreate(TopMemoryContext,
147                                                                                    "Wal Sender",
148                                                                                    ALLOCSET_DEFAULT_MINSIZE,
149                                                                                    ALLOCSET_DEFAULT_INITSIZE,
150                                                                                    ALLOCSET_DEFAULT_MAXSIZE);
151         MemoryContextSwitchTo(walsnd_context);
152
153         /* Set up resource owner */
154         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
155
156         /* Unblock signals (they were blocked when the postmaster forked us) */
157         PG_SETMASK(&UnBlockSig);
158
159         /* Tell the standby that walsender is ready for receiving commands */
160         ReadyForQuery(DestRemote);
161
162         /* Handle handshake messages before streaming */
163         WalSndHandshake();
164
165         /* Initialize shared memory status */
166         {
167                 /* use volatile pointer to prevent code rearrangement */
168                 volatile WalSnd *walsnd = MyWalSnd;
169
170                 SpinLockAcquire(&walsnd->mutex);
171                 walsnd->sentPtr = sentPtr;
172                 SpinLockRelease(&walsnd->mutex);
173         }
174
175         /* Main loop of walsender */
176         return WalSndLoop();
177 }
178
179 /*
180  * Execute commands from walreceiver, until we enter streaming mode.
181  */
182 static void
183 WalSndHandshake(void)
184 {
185         StringInfoData input_message;
186         bool            replication_started = false;
187
188         initStringInfo(&input_message);
189
190         while (!replication_started)
191         {
192                 int                     firstchar;
193
194                 WalSndSetState(WALSNDSTATE_STARTUP);
195                 set_ps_display("idle", false);
196
197                 /* Wait for a command to arrive */
198                 firstchar = pq_getbyte();
199
200                 /*
201                  * Emergency bailout if postmaster has died.  This is to avoid the
202                  * necessity for manual cleanup of all postmaster children.
203                  */
204                 if (!PostmasterIsAlive(true))
205                         exit(1);
206
207                 /*
208                  * Check for any other interesting events that happened while we
209                  * slept.
210                  */
211                 if (got_SIGHUP)
212                 {
213                         got_SIGHUP = false;
214                         ProcessConfigFile(PGC_SIGHUP);
215                 }
216
217                 if (firstchar != EOF)
218                 {
219                         /*
220                          * Read the message contents. This is expected to be done without
221                          * blocking because we've been able to get message type code.
222                          */
223                         if (pq_getmessage(&input_message, 0))
224                                 firstchar = EOF;        /* suitable message already logged */
225                 }
226
227                 /* Handle the very limited subset of commands expected in this phase */
228                 switch (firstchar)
229                 {
230                         case 'Q':                       /* Query message */
231                                 {
232                                         const char *query_string;
233
234                                         query_string = pq_getmsgstring(&input_message);
235                                         pq_getmsgend(&input_message);
236
237                                         if (HandleReplicationCommand(query_string))
238                                                 replication_started = true;
239                                 }
240                                 break;
241
242                         case 'X':
243                                 /* standby is closing the connection */
244                                 proc_exit(0);
245
246                         case EOF:
247                                 /* standby disconnected unexpectedly */
248                                 ereport(COMMERROR,
249                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
250                                                  errmsg("unexpected EOF on standby connection")));
251                                 proc_exit(0);
252
253                         default:
254                                 ereport(FATAL,
255                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
256                                                  errmsg("invalid standby handshake message type %d", firstchar)));
257                 }
258         }
259 }
260
261 /*
262  * IDENTIFY_SYSTEM
263  */
264 static void
265 IdentifySystem(void)
266 {
267         StringInfoData buf;
268         char            sysid[32];
269         char            tli[11];
270         char            xpos[MAXFNAMELEN];
271         XLogRecPtr      logptr;
272
273         /*
274          * Reply with a result set with one row, three columns. First col is system
275          * ID, second is timeline ID, and third is current xlog location.
276          */
277
278         snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
279                          GetSystemIdentifier());
280         snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
281
282         logptr = GetInsertRecPtr();
283
284         snprintf(xpos, sizeof(xpos), "%X/%X",
285                          logptr.xlogid, logptr.xrecoff);
286
287         /* Send a RowDescription message */
288         pq_beginmessage(&buf, 'T');
289         pq_sendint(&buf, 3, 2);         /* 3 fields */
290
291         /* first field */
292         pq_sendstring(&buf, "systemid");        /* col name */
293         pq_sendint(&buf, 0, 4);         /* table oid */
294         pq_sendint(&buf, 0, 2);         /* attnum */
295         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
296         pq_sendint(&buf, -1, 2);        /* typlen */
297         pq_sendint(&buf, 0, 4);         /* typmod */
298         pq_sendint(&buf, 0, 2);         /* format code */
299
300         /* second field */
301         pq_sendstring(&buf, "timeline");        /* col name */
302         pq_sendint(&buf, 0, 4);         /* table oid */
303         pq_sendint(&buf, 0, 2);         /* attnum */
304         pq_sendint(&buf, INT4OID, 4);           /* type oid */
305         pq_sendint(&buf, 4, 2);         /* typlen */
306         pq_sendint(&buf, 0, 4);         /* typmod */
307         pq_sendint(&buf, 0, 2);         /* format code */
308
309         /* third field */
310         pq_sendstring(&buf, "xlogpos");
311         pq_sendint(&buf, 0, 4);
312         pq_sendint(&buf, 0, 2);
313         pq_sendint(&buf, TEXTOID, 4);
314         pq_sendint(&buf, -1, 2);
315         pq_sendint(&buf, 0, 4);
316         pq_sendint(&buf, 0, 2);
317         pq_endmessage(&buf);
318
319         /* Send a DataRow message */
320         pq_beginmessage(&buf, 'D');
321         pq_sendint(&buf, 3, 2);         /* # of columns */
322         pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
323         pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
324         pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
325         pq_sendbytes(&buf, (char *) tli, strlen(tli));
326         pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
327         pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
328
329         pq_endmessage(&buf);
330
331         /* Send CommandComplete and ReadyForQuery messages */
332         EndCommand("SELECT", DestRemote);
333         ReadyForQuery(DestRemote);
334         /* ReadyForQuery did pq_flush for us */
335 }
336
337 /*
338  * START_REPLICATION
339  */
340 static void
341 StartReplication(StartReplicationCmd * cmd)
342 {
343         StringInfoData buf;
344
345         /*
346          * Let postmaster know that we're streaming. Once we've declared us as
347          * a WAL sender process, postmaster will let us outlive the bgwriter and
348          * kill us last in the shutdown sequence, so we get a chance to stream
349          * all remaining WAL at shutdown, including the shutdown checkpoint.
350          * Note that there's no going back, and we mustn't write any WAL records
351          * after this.
352          */
353         MarkPostmasterChildWalSender();
354
355         /*
356          * Check that we're logging enough information in the WAL for
357          * log-shipping.
358          *
359          * NOTE: This only checks the current value of wal_level. Even if the
360          * current setting is not 'minimal', there can be old WAL in the pg_xlog
361          * directory that was created with 'minimal'. So this is not bulletproof,
362          * the purpose is just to give a user-friendly error message that hints
363          * how to configure the system correctly.
364          */
365         if (wal_level == WAL_LEVEL_MINIMAL)
366                 ereport(FATAL,
367                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
368                 errmsg("standby connections not allowed because wal_level=minimal")));
369
370         /* Send a CopyBothResponse message, and start streaming */
371         pq_beginmessage(&buf, 'W');
372         pq_sendbyte(&buf, 0);
373         pq_sendint(&buf, 0, 2);
374         pq_endmessage(&buf);
375         pq_flush();
376
377         /*
378          * Initialize position to the received one, then the xlog records begin to
379          * be shipped from that position
380          */
381         sentPtr = cmd->startpoint;
382 }
383
384 /*
385  * Execute an incoming replication command.
386  */
387 static bool
388 HandleReplicationCommand(const char *cmd_string)
389 {
390         bool            replication_started = false;
391         int                     parse_rc;
392         Node       *cmd_node;
393         MemoryContext cmd_context;
394         MemoryContext old_context;
395
396         elog(DEBUG1, "received replication command: %s", cmd_string);
397
398         cmd_context = AllocSetContextCreate(CurrentMemoryContext,
399                                                                                 "Replication command context",
400                                                                                 ALLOCSET_DEFAULT_MINSIZE,
401                                                                                 ALLOCSET_DEFAULT_INITSIZE,
402                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
403         old_context = MemoryContextSwitchTo(cmd_context);
404
405         replication_scanner_init(cmd_string);
406         parse_rc = replication_yyparse();
407         if (parse_rc != 0)
408                 ereport(ERROR,
409                                 (errcode(ERRCODE_SYNTAX_ERROR),
410                                  (errmsg_internal("replication command parser returned %d",
411                                                                   parse_rc))));
412
413         cmd_node = replication_parse_result;
414
415         switch (cmd_node->type)
416         {
417                 case T_IdentifySystemCmd:
418                         IdentifySystem();
419                         break;
420
421                 case T_StartReplicationCmd:
422                         StartReplication((StartReplicationCmd *) cmd_node);
423
424                         /* break out of the loop */
425                         replication_started = true;
426                         break;
427
428                 case T_BaseBackupCmd:
429                         SendBaseBackup((BaseBackupCmd *) cmd_node);
430
431                         /* Send CommandComplete and ReadyForQuery messages */
432                         EndCommand("SELECT", DestRemote);
433                         ReadyForQuery(DestRemote);
434                         /* ReadyForQuery did pq_flush for us */
435                         break;
436
437                 default:
438                         ereport(FATAL,
439                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
440                                          errmsg("invalid standby query string: %s", cmd_string)));
441         }
442
443         /* done */
444         MemoryContextSwitchTo(old_context);
445         MemoryContextDelete(cmd_context);
446
447         return replication_started;
448 }
449
450 /*
451  * Check if the remote end has closed the connection.
452  */
453 static void
454 ProcessRepliesIfAny(void)
455 {
456         unsigned char firstchar;
457         int                     r;
458
459         r = pq_getbyte_if_available(&firstchar);
460         if (r < 0)
461         {
462                 /* unexpected error or EOF */
463                 ereport(COMMERROR,
464                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
465                                  errmsg("unexpected EOF on standby connection")));
466                 proc_exit(0);
467         }
468         if (r == 0)
469         {
470                 /* no data available without blocking */
471                 return;
472         }
473
474         /* Handle the very limited subset of commands expected in this phase */
475         switch (firstchar)
476         {
477                         /*
478                          * 'd' means a standby reply wrapped in a CopyData packet.
479                          */
480                 case 'd':
481                         ProcessStandbyReplyMessage();
482                         break;
483
484                         /*
485                          * 'X' means that the standby is closing down the socket.
486                          */
487                 case 'X':
488                         proc_exit(0);
489
490                 default:
491                         ereport(FATAL,
492                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
493                                          errmsg("invalid standby closing message type %d",
494                                                         firstchar)));
495         }
496 }
497
498 /*
499  * Process a status update message received from standby.
500  */
501 static void
502 ProcessStandbyReplyMessage(void)
503 {
504         StandbyReplyMessage     reply;
505         char msgtype;
506         TransactionId newxmin = InvalidTransactionId;
507
508         resetStringInfo(&reply_message);
509
510         /*
511          * Read the message contents.
512          */
513         if (pq_getmessage(&reply_message, 0))
514         {
515                 ereport(COMMERROR,
516                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
517                                  errmsg("unexpected EOF on standby connection")));
518                 proc_exit(0);
519         }
520
521         /*
522          * Check message type from the first byte. At the moment, there is only
523          * one type.
524          */
525         msgtype = pq_getmsgbyte(&reply_message);
526         if (msgtype != 'r')
527         {
528                 ereport(COMMERROR,
529                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
530                                  errmsg("unexpected message type %c", msgtype)));
531                 proc_exit(0);
532         }
533
534         pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
535
536         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
537                  reply.write.xlogid, reply.write.xrecoff,
538                  reply.flush.xlogid, reply.flush.xrecoff,
539                  reply.apply.xlogid, reply.apply.xrecoff,
540                  reply.xmin,
541                  reply.epoch);
542
543         /*
544          * Update shared state for this WalSender process
545          * based on reply data from standby.
546          */
547         {
548                 /* use volatile pointer to prevent code rearrangement */
549                 volatile WalSnd *walsnd = MyWalSnd;
550
551                 SpinLockAcquire(&walsnd->mutex);
552                 walsnd->write = reply.write;
553                 walsnd->flush = reply.flush;
554                 walsnd->apply = reply.apply;
555                 SpinLockRelease(&walsnd->mutex);
556         }
557
558         /*
559          * Update the WalSender's proc xmin to allow it to be visible
560          * to snapshots. This will hold back the removal of dead rows
561          * and thereby prevent the generation of cleanup conflicts
562          * on the standby server.
563          */
564         if (TransactionIdIsValid(reply.xmin))
565         {
566                 TransactionId   nextXid;
567                 uint32                  nextEpoch;
568                 bool                    epochOK = false;
569
570                 GetNextXidAndEpoch(&nextXid, &nextEpoch);
571
572                 /*
573                  * Epoch of oldestXmin should be same as standby or
574                  * if the counter has wrapped, then one less than reply.
575                  */
576                 if (reply.xmin <= nextXid)
577                 {
578                         if (reply.epoch == nextEpoch)
579                                 epochOK = true;
580                 }
581                 else
582                 {
583                         if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
584                                 epochOK = true;
585                 }
586
587                 /*
588                  * Feedback from standby must not go backwards, nor should it go
589                  * forwards further than our most recent xid.
590                  */
591                 if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
592                 {
593                         if (!TransactionIdIsValid(MyProc->xmin))
594                         {
595                                 TransactionId oldestXmin = GetOldestXmin(true, true);
596                                 if (TransactionIdPrecedes(oldestXmin, reply.xmin))
597                                         newxmin = reply.xmin;
598                                 else
599                                         newxmin = oldestXmin;
600                         }
601                         else
602                         {
603                                 if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
604                                         newxmin = reply.xmin;
605                                 else
606                                         newxmin = MyProc->xmin; /* stay the same */
607                         }
608                 }
609         }
610
611         /*
612          * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
613          */
614         if (MyProc->xmin != newxmin)
615         {
616                 LWLockAcquire(ProcArrayLock, LW_SHARED);
617                 MyProc->xmin = newxmin;
618                 LWLockRelease(ProcArrayLock);
619         }
620 }
621
622 /* Main loop of walsender process */
623 static int
624 WalSndLoop(void)
625 {
626         char       *output_message;
627         bool            caughtup = false;
628
629         /*
630          * Allocate buffer that will be used for each output message.  We do this
631          * just once to reduce palloc overhead.  The buffer must be made large
632          * enough for maximum-sized messages.
633          */
634         output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
635
636         /*
637          * Allocate buffer that will be used for processing reply messages.  As
638          * above, do this just once to reduce palloc overhead.
639          */
640         initStringInfo(&reply_message);
641
642         /* Loop forever, unless we get an error */
643         for (;;)
644         {
645                 /*
646                  * Emergency bailout if postmaster has died.  This is to avoid the
647                  * necessity for manual cleanup of all postmaster children.
648                  */
649                 if (!PostmasterIsAlive(true))
650                         exit(1);
651
652                 /* Process any requests or signals received recently */
653                 if (got_SIGHUP)
654                 {
655                         got_SIGHUP = false;
656                         ProcessConfigFile(PGC_SIGHUP);
657                 }
658
659                 /*
660                  * When SIGUSR2 arrives, we send all outstanding logs up to the
661                  * shutdown checkpoint record (i.e., the latest record) and exit.
662                  */
663                 if (walsender_ready_to_stop)
664                 {
665                         if (!XLogSend(output_message, &caughtup))
666                                 break;
667                         ProcessRepliesIfAny();
668                         if (caughtup)
669                                 walsender_shutdown_requested = true;
670                 }
671
672                 /* Normal exit from the walsender is here */
673                 if (walsender_shutdown_requested)
674                 {
675                         /* Inform the standby that XLOG streaming was done */
676                         pq_puttextmessage('C', "COPY 0");
677                         pq_flush();
678
679                         proc_exit(0);
680                 }
681
682                 /*
683                  * If we had sent all accumulated WAL in last round, nap for the
684                  * configured time before retrying.
685                  */
686                 if (caughtup)
687                 {
688                         /*
689                          * Even if we wrote all the WAL that was available when we started
690                          * sending, more might have arrived while we were sending this
691                          * batch. We had the latch set while sending, so we have not
692                          * received any signals from that time. Let's arm the latch
693                          * again, and after that check that we're still up-to-date.
694                          */
695                         ResetLatch(&MyWalSnd->latch);
696
697                         if (!XLogSend(output_message, &caughtup))
698                                 break;
699                         if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
700                         {
701                                 /*
702                                  * XXX: We don't really need the periodic wakeups anymore,
703                                  * WaitLatchOrSocket should reliably wake up as soon as
704                                  * something interesting happens.
705                                  */
706
707                                 /* Sleep */
708                                 WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
709                                                                   WalSndDelay * 1000L);
710                         }
711                 }
712                 else
713                 {
714                         /* Attempt to send the log once every loop */
715                         if (!XLogSend(output_message, &caughtup))
716                                 break;
717                 }
718
719                 /* Update our state to indicate if we're behind or not */
720                 WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
721                 ProcessRepliesIfAny();
722         }
723
724         /*
725          * Get here on send failure.  Clean up and exit.
726          *
727          * Reset whereToSendOutput to prevent ereport from attempting to send any
728          * more messages to the standby.
729          */
730         if (whereToSendOutput == DestRemote)
731                 whereToSendOutput = DestNone;
732
733         proc_exit(0);
734         return 1;                                       /* keep the compiler quiet */
735 }
736
737 /* Initialize a per-walsender data structure for this walsender process */
738 static void
739 InitWalSnd(void)
740 {
741         int                     i;
742
743         /*
744          * WalSndCtl should be set up already (we inherit this by fork() or
745          * EXEC_BACKEND mechanism from the postmaster).
746          */
747         Assert(WalSndCtl != NULL);
748         Assert(MyWalSnd == NULL);
749
750         /*
751          * Find a free walsender slot and reserve it. If this fails, we must be
752          * out of WalSnd structures.
753          */
754         for (i = 0; i < max_wal_senders; i++)
755         {
756                 /* use volatile pointer to prevent code rearrangement */
757                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
758
759                 SpinLockAcquire(&walsnd->mutex);
760
761                 if (walsnd->pid != 0)
762                 {
763                         SpinLockRelease(&walsnd->mutex);
764                         continue;
765                 }
766                 else
767                 {
768                         /*
769                          * Found a free slot. Reserve it for us.
770                          */
771                         walsnd->pid = MyProcPid;
772                         MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
773                         walsnd->state = WALSNDSTATE_STARTUP;
774                         SpinLockRelease(&walsnd->mutex);
775                         /* don't need the lock anymore */
776                         OwnLatch((Latch *) &walsnd->latch);
777                         MyWalSnd = (WalSnd *) walsnd;
778
779                         break;
780                 }
781         }
782         if (MyWalSnd == NULL)
783                 ereport(FATAL,
784                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
785                                  errmsg("number of requested standby connections "
786                                                 "exceeds max_wal_senders (currently %d)",
787                                                 max_wal_senders)));
788
789         /* Arrange to clean up at walsender exit */
790         on_shmem_exit(WalSndKill, 0);
791 }
792
793 /* Destroy the per-walsender data structure for this walsender process */
794 static void
795 WalSndKill(int code, Datum arg)
796 {
797         Assert(MyWalSnd != NULL);
798
799         /*
800          * Mark WalSnd struct no longer in use. Assume that no lock is required
801          * for this.
802          */
803         MyWalSnd->pid = 0;
804         DisownLatch(&MyWalSnd->latch);
805
806         /* WalSnd struct isn't mine anymore */
807         MyWalSnd = NULL;
808 }
809
810 /*
811  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
812  *
813  * XXX probably this should be improved to suck data directly from the
814  * WAL buffers when possible.
815  *
816  * Will open, and keep open, one WAL segment stored in the global file
817  * descriptor sendFile. This means if XLogRead is used once, there will
818  * always be one descriptor left open until the process ends, but never
819  * more than one.
820  */
821 void
822 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
823 {
824         XLogRecPtr      startRecPtr = recptr;
825         char            path[MAXPGPATH];
826         uint32          lastRemovedLog;
827         uint32          lastRemovedSeg;
828         uint32          log;
829         uint32          seg;
830
831         while (nbytes > 0)
832         {
833                 uint32          startoff;
834                 int                     segbytes;
835                 int                     readbytes;
836
837                 startoff = recptr.xrecoff % XLogSegSize;
838
839                 if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
840                 {
841                         /* Switch to another logfile segment */
842                         if (sendFile >= 0)
843                                 close(sendFile);
844
845                         XLByteToSeg(recptr, sendId, sendSeg);
846                         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
847
848                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
849                         if (sendFile < 0)
850                         {
851                                 /*
852                                  * If the file is not found, assume it's because the standby
853                                  * asked for a too old WAL segment that has already been
854                                  * removed or recycled.
855                                  */
856                                 if (errno == ENOENT)
857                                 {
858                                         char            filename[MAXFNAMELEN];
859
860                                         XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
861                                         ereport(ERROR,
862                                                         (errcode_for_file_access(),
863                                                          errmsg("requested WAL segment %s has already been removed",
864                                                                         filename)));
865                                 }
866                                 else
867                                         ereport(ERROR,
868                                                         (errcode_for_file_access(),
869                                                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
870                                                                         path, sendId, sendSeg)));
871                         }
872                         sendOff = 0;
873                 }
874
875                 /* Need to seek in the file? */
876                 if (sendOff != startoff)
877                 {
878                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
879                                 ereport(ERROR,
880                                                 (errcode_for_file_access(),
881                                                  errmsg("could not seek in log file %u, segment %u to offset %u: %m",
882                                                                 sendId, sendSeg, startoff)));
883                         sendOff = startoff;
884                 }
885
886                 /* How many bytes are within this segment? */
887                 if (nbytes > (XLogSegSize - startoff))
888                         segbytes = XLogSegSize - startoff;
889                 else
890                         segbytes = nbytes;
891
892                 readbytes = read(sendFile, buf, segbytes);
893                 if (readbytes <= 0)
894                         ereport(ERROR,
895                                         (errcode_for_file_access(),
896                         errmsg("could not read from log file %u, segment %u, offset %u, "
897                                    "length %lu: %m",
898                                    sendId, sendSeg, sendOff, (unsigned long) segbytes)));
899
900                 /* Update state for read */
901                 XLByteAdvance(recptr, readbytes);
902
903                 sendOff += readbytes;
904                 nbytes -= readbytes;
905                 buf += readbytes;
906         }
907
908         /*
909          * After reading into the buffer, check that what we read was valid. We do
910          * this after reading, because even though the segment was present when we
911          * opened it, it might get recycled or removed while we read it. The
912          * read() succeeds in that case, but the data we tried to read might
913          * already have been overwritten with new WAL records.
914          */
915         XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
916         XLByteToSeg(startRecPtr, log, seg);
917         if (log < lastRemovedLog ||
918                 (log == lastRemovedLog && seg <= lastRemovedSeg))
919         {
920                 char            filename[MAXFNAMELEN];
921
922                 XLogFileName(filename, ThisTimeLineID, log, seg);
923                 ereport(ERROR,
924                                 (errcode_for_file_access(),
925                                  errmsg("requested WAL segment %s has already been removed",
926                                                 filename)));
927         }
928 }
929
930 /*
931  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
932  * but not yet sent to the client, and send it.
933  *
934  * msgbuf is a work area in which the output message is constructed.  It's
935  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
936  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
937  *
938  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
939  * *caughtup is set to false.
940  *
941  * Returns true if OK, false if trouble.
942  */
943 static bool
944 XLogSend(char *msgbuf, bool *caughtup)
945 {
946         XLogRecPtr      SendRqstPtr;
947         XLogRecPtr      startptr;
948         XLogRecPtr      endptr;
949         Size            nbytes;
950         WalDataMessageHeader msghdr;
951
952         /*
953          * Attempt to send all data that's already been written out and fsync'd to
954          * disk.  We cannot go further than what's been written out given the
955          * current implementation of XLogRead().  And in any case it's unsafe to
956          * send WAL that is not securely down to disk on the master: if the master
957          * subsequently crashes and restarts, slaves must not have applied any WAL
958          * that gets lost on the master.
959          */
960         SendRqstPtr = GetFlushRecPtr();
961
962         /* Quick exit if nothing to do */
963         if (XLByteLE(SendRqstPtr, sentPtr))
964         {
965                 *caughtup = true;
966                 return true;
967         }
968
969         /*
970          * Figure out how much to send in one message. If there's no more than
971          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
972          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
973          *
974          * The rounding is not only for performance reasons. Walreceiver relies on
975          * the fact that we never split a WAL record across two messages. Since a
976          * long WAL record is split at page boundary into continuation records,
977          * page boundary is always a safe cut-off point. We also assume that
978          * SendRqstPtr never points to the middle of a WAL record.
979          */
980         startptr = sentPtr;
981         if (startptr.xrecoff >= XLogFileSize)
982         {
983                 /*
984                  * crossing a logid boundary, skip the non-existent last log segment
985                  * in previous logical log file.
986                  */
987                 startptr.xlogid += 1;
988                 startptr.xrecoff = 0;
989         }
990
991         endptr = startptr;
992         XLByteAdvance(endptr, MAX_SEND_SIZE);
993         if (endptr.xlogid != startptr.xlogid)
994         {
995                 /* Don't cross a logfile boundary within one message */
996                 Assert(endptr.xlogid == startptr.xlogid + 1);
997                 endptr.xlogid = startptr.xlogid;
998                 endptr.xrecoff = XLogFileSize;
999         }
1000
1001         /* if we went beyond SendRqstPtr, back off */
1002         if (XLByteLE(SendRqstPtr, endptr))
1003         {
1004                 endptr = SendRqstPtr;
1005                 *caughtup = true;
1006         }
1007         else
1008         {
1009                 /* round down to page boundary. */
1010                 endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
1011                 *caughtup = false;
1012         }
1013
1014         nbytes = endptr.xrecoff - startptr.xrecoff;
1015         Assert(nbytes <= MAX_SEND_SIZE);
1016
1017         /*
1018          * OK to read and send the slice.
1019          */
1020         msgbuf[0] = 'w';
1021
1022         /*
1023          * Read the log directly into the output buffer to avoid extra memcpy
1024          * calls.
1025          */
1026         XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
1027
1028         /*
1029          * We fill the message header last so that the send timestamp is taken as
1030          * late as possible.
1031          */
1032         msghdr.dataStart = startptr;
1033         msghdr.walEnd = SendRqstPtr;
1034         msghdr.sendTime = GetCurrentTimestamp();
1035
1036         memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
1037
1038         pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
1039
1040         /* Flush pending output to the client */
1041         if (pq_flush())
1042                 return false;
1043
1044         sentPtr = endptr;
1045
1046         /* Update shared memory status */
1047         {
1048                 /* use volatile pointer to prevent code rearrangement */
1049                 volatile WalSnd *walsnd = MyWalSnd;
1050
1051                 SpinLockAcquire(&walsnd->mutex);
1052                 walsnd->sentPtr = sentPtr;
1053                 SpinLockRelease(&walsnd->mutex);
1054         }
1055
1056         /* Report progress of XLOG streaming in PS display */
1057         if (update_process_title)
1058         {
1059                 char            activitymsg[50];
1060
1061                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1062                                  sentPtr.xlogid, sentPtr.xrecoff);
1063                 set_ps_display(activitymsg, false);
1064         }
1065
1066         return true;
1067 }
1068
1069 /* SIGHUP: set flag to re-read config file at next convenient time */
1070 static void
1071 WalSndSigHupHandler(SIGNAL_ARGS)
1072 {
1073         got_SIGHUP = true;
1074         if (MyWalSnd)
1075                 SetLatch(&MyWalSnd->latch);
1076 }
1077
1078 /* SIGTERM: set flag to shut down */
1079 static void
1080 WalSndShutdownHandler(SIGNAL_ARGS)
1081 {
1082         walsender_shutdown_requested = true;
1083         if (MyWalSnd)
1084                 SetLatch(&MyWalSnd->latch);
1085 }
1086
1087 /*
1088  * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
1089  *
1090  * Some backend has bought the farm,
1091  * so we need to stop what we're doing and exit.
1092  */
1093 static void
1094 WalSndQuickDieHandler(SIGNAL_ARGS)
1095 {
1096         PG_SETMASK(&BlockSig);
1097
1098         /*
1099          * We DO NOT want to run proc_exit() callbacks -- we're here because
1100          * shared memory may be corrupted, so we don't want to try to clean up our
1101          * transaction.  Just nail the windows shut and get out of town.  Now that
1102          * there's an atexit callback to prevent third-party code from breaking
1103          * things by calling exit() directly, we have to reset the callbacks
1104          * explicitly to make this work as intended.
1105          */
1106         on_exit_reset();
1107
1108         /*
1109          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
1110          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
1111          * backend.  This is necessary precisely because we don't clean up our
1112          * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
1113          * should ensure the postmaster sees this as a crash, too, but no harm in
1114          * being doubly sure.)
1115          */
1116         exit(2);
1117 }
1118
1119 /* SIGUSR1: set flag to send WAL records */
1120 static void
1121 WalSndXLogSendHandler(SIGNAL_ARGS)
1122 {
1123         latch_sigusr1_handler();
1124 }
1125
1126 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1127 static void
1128 WalSndLastCycleHandler(SIGNAL_ARGS)
1129 {
1130         walsender_ready_to_stop = true;
1131         if (MyWalSnd)
1132                 SetLatch(&MyWalSnd->latch);
1133 }
1134
1135 /* Set up signal handlers */
1136 void
1137 WalSndSignals(void)
1138 {
1139         /* Set up signal handlers */
1140         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
1141                                                                                                  * file */
1142         pqsignal(SIGINT, SIG_IGN);      /* not used */
1143         pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
1144         pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
1145         pqsignal(SIGALRM, SIG_IGN);
1146         pqsignal(SIGPIPE, SIG_IGN);
1147         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
1148         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
1149                                                                                                  * shutdown */
1150
1151         /* Reset some signals that are accepted by postmaster but not here */
1152         pqsignal(SIGCHLD, SIG_DFL);
1153         pqsignal(SIGTTIN, SIG_DFL);
1154         pqsignal(SIGTTOU, SIG_DFL);
1155         pqsignal(SIGCONT, SIG_DFL);
1156         pqsignal(SIGWINCH, SIG_DFL);
1157 }
1158
1159 /* Report shared-memory space needed by WalSndShmemInit */
1160 Size
1161 WalSndShmemSize(void)
1162 {
1163         Size            size = 0;
1164
1165         size = offsetof(WalSndCtlData, walsnds);
1166         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1167
1168         return size;
1169 }
1170
1171 /* Allocate and initialize walsender-related shared memory */
1172 void
1173 WalSndShmemInit(void)
1174 {
1175         bool            found;
1176         int                     i;
1177
1178         WalSndCtl = (WalSndCtlData *)
1179                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1180
1181         if (!found)
1182         {
1183                 /* First time through, so initialize */
1184                 MemSet(WalSndCtl, 0, WalSndShmemSize());
1185
1186                 for (i = 0; i < max_wal_senders; i++)
1187                 {
1188                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
1189
1190                         SpinLockInit(&walsnd->mutex);
1191                         InitSharedLatch(&walsnd->latch);
1192                 }
1193         }
1194 }
1195
1196 /* Wake up all walsenders */
1197 void
1198 WalSndWakeup(void)
1199 {
1200         int             i;
1201
1202         for (i = 0; i < max_wal_senders; i++)
1203                 SetLatch(&WalSndCtl->walsnds[i].latch);
1204 }
1205
1206 /* Set state for current walsender (only called in walsender) */
1207 void
1208 WalSndSetState(WalSndState state)
1209 {
1210         /* use volatile pointer to prevent code rearrangement */
1211         volatile WalSnd *walsnd = MyWalSnd;
1212
1213         Assert(am_walsender);
1214
1215         if (walsnd->state == state)
1216                 return;
1217
1218         SpinLockAcquire(&walsnd->mutex);
1219         walsnd->state = state;
1220         SpinLockRelease(&walsnd->mutex);
1221 }
1222
1223 /*
1224  * Return a string constant representing the state. This is used
1225  * in system views, and should *not* be translated.
1226  */
1227 static const char *
1228 WalSndGetStateString(WalSndState state)
1229 {
1230         switch (state)
1231         {
1232                 case WALSNDSTATE_STARTUP:
1233                         return "STARTUP";
1234                 case WALSNDSTATE_BACKUP:
1235                         return "BACKUP";
1236                 case WALSNDSTATE_CATCHUP:
1237                         return "CATCHUP";
1238                 case WALSNDSTATE_STREAMING:
1239                         return "STREAMING";
1240         }
1241         return "UNKNOWN";
1242 }
1243
1244
1245 /*
1246  * Returns activity of walsenders, including pids and xlog locations sent to
1247  * standby servers.
1248  */
1249 Datum
1250 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1251 {
1252 #define PG_STAT_GET_WAL_SENDERS_COLS    6
1253         ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1254         TupleDesc                       tupdesc;
1255         Tuplestorestate    *tupstore;
1256         MemoryContext           per_query_ctx;
1257         MemoryContext           oldcontext;
1258         int                                     i;
1259
1260         /* check to see if caller supports us returning a tuplestore */
1261         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1262                 ereport(ERROR,
1263                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1264                                  errmsg("set-valued function called in context that cannot accept a set")));
1265         if (!(rsinfo->allowedModes & SFRM_Materialize))
1266                 ereport(ERROR,
1267                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1268                                  errmsg("materialize mode required, but it is not " \
1269                                                 "allowed in this context")));
1270
1271         /* Build a tuple descriptor for our result type */
1272         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1273                 elog(ERROR, "return type must be a row type");
1274
1275         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1276         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1277
1278         tupstore = tuplestore_begin_heap(true, false, work_mem);
1279         rsinfo->returnMode = SFRM_Materialize;
1280         rsinfo->setResult = tupstore;
1281         rsinfo->setDesc = tupdesc;
1282
1283         MemoryContextSwitchTo(oldcontext);
1284
1285         for (i = 0; i < max_wal_senders; i++)
1286         {
1287                 /* use volatile pointer to prevent code rearrangement */
1288                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1289                 char            location[MAXFNAMELEN];
1290                 XLogRecPtr      sentPtr;
1291                 XLogRecPtr      write;
1292                 XLogRecPtr      flush;
1293                 XLogRecPtr      apply;
1294                 WalSndState     state;
1295                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1296                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1297
1298                 if (walsnd->pid == 0)
1299                         continue;
1300
1301                 SpinLockAcquire(&walsnd->mutex);
1302                 sentPtr = walsnd->sentPtr;
1303                 state = walsnd->state;
1304                 write = walsnd->write;
1305                 flush = walsnd->flush;
1306                 apply = walsnd->apply;
1307                 SpinLockRelease(&walsnd->mutex);
1308
1309                 memset(nulls, 0, sizeof(nulls));
1310                 values[0] = Int32GetDatum(walsnd->pid);
1311
1312                 if (!superuser())
1313                 {
1314                         /*
1315                          * Only superusers can see details. Other users only get
1316                          * the pid value to know it's a walsender, but no details.
1317                          */
1318                         nulls[1] = true;
1319                         nulls[2] = true;
1320                         nulls[3] = true;
1321                         nulls[4] = true;
1322                         nulls[5] = true;
1323                 }
1324                 else
1325                 {
1326                         values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1327
1328                         snprintf(location, sizeof(location), "%X/%X",
1329                                          sentPtr.xlogid, sentPtr.xrecoff);
1330                         values[2] = CStringGetTextDatum(location);
1331
1332                         if (write.xlogid == 0 && write.xrecoff == 0)
1333                                 nulls[3] = true;
1334                         snprintf(location, sizeof(location), "%X/%X",
1335                                          write.xlogid, write.xrecoff);
1336                         values[3] = CStringGetTextDatum(location);
1337
1338                         if (flush.xlogid == 0 && flush.xrecoff == 0)
1339                                 nulls[4] = true;
1340                         snprintf(location, sizeof(location), "%X/%X",
1341                                         flush.xlogid, flush.xrecoff);
1342                         values[4] = CStringGetTextDatum(location);
1343
1344                         if (apply.xlogid == 0 && apply.xrecoff == 0)
1345                                 nulls[5] = true;
1346                         snprintf(location, sizeof(location), "%X/%X",
1347                                          apply.xlogid, apply.xrecoff);
1348                         values[5] = CStringGetTextDatum(location);
1349                 }
1350
1351                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1352         }
1353
1354         /* clean up and return the tuplestore */
1355         tuplestore_donestoring(tupstore);
1356
1357         return (Datum) 0;
1358 }
1359
1360 /*
1361  * This isn't currently used for anything. Monitoring tools might be
1362  * interested in the future, and we'll need something like this in the
1363  * future for synchronous replication.
1364  */
1365 #ifdef NOT_USED
1366 /*
1367  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
1368  * if none.
1369  */
1370 XLogRecPtr
1371 GetOldestWALSendPointer(void)
1372 {
1373         XLogRecPtr      oldest = {0, 0};
1374         int                     i;
1375         bool            found = false;
1376
1377         for (i = 0; i < max_wal_senders; i++)
1378         {
1379                 /* use volatile pointer to prevent code rearrangement */
1380                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1381                 XLogRecPtr      recptr;
1382
1383                 if (walsnd->pid == 0)
1384                         continue;
1385
1386                 SpinLockAcquire(&walsnd->mutex);
1387                 recptr = walsnd->sentPtr;
1388                 SpinLockRelease(&walsnd->mutex);
1389
1390                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
1391                         continue;
1392
1393                 if (!found || XLByteLT(recptr, oldest))
1394                         oldest = recptr;
1395                 found = true;
1396         }
1397         return oldest;
1398 }
1399
1400 #endif