]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlogreader.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / access / transam / xlogreader.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *              Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *              src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *              See xlogreader.h for more notes on this facility.
13  *
14  *              This file is compiled as both front-end and backend code, so it
15  *              may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27
28 #ifndef FRONTEND
29 #include "miscadmin.h"
30 #include "utils/memutils.h"
31 #endif
32
33
34 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
35                         pg_attribute_printf(2, 3);
36 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
37 static int      ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
38                                                          int reqLen);
39 static void XLogReaderInvalReadState(XLogReaderState *state);
40 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
41                                                                   XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
42 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
43                                                         XLogRecPtr recptr);
44 static void ResetDecoder(XLogReaderState *state);
45
46 /* size of the buffer allocated for error message. */
47 #define MAX_ERRORMSG_LEN 1000
48
49 /*
50  * Construct a string in state->errormsg_buf explaining what's wrong with
51  * the current record being read.
52  */
53 static void
54 report_invalid_record(XLogReaderState *state, const char *fmt,...)
55 {
56         va_list         args;
57
58         fmt = _(fmt);
59
60         va_start(args, fmt);
61         vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
62         va_end(args);
63 }
64
65 /*
66  * Allocate and initialize a new XLogReader.
67  *
68  * Returns NULL if the xlogreader couldn't be allocated.
69  */
70 XLogReaderState *
71 XLogReaderAllocate(int wal_segment_size, const char *waldir,
72                                    XLogPageReadCB pagereadfunc, void *private_data)
73 {
74         XLogReaderState *state;
75
76         state = (XLogReaderState *)
77                 palloc_extended(sizeof(XLogReaderState),
78                                                 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
79         if (!state)
80                 return NULL;
81
82         state->max_block_id = -1;
83
84         /*
85          * Permanently allocate readBuf.  We do it this way, rather than just
86          * making a static array, for two reasons: (1) no need to waste the
87          * storage in most instantiations of the backend; (2) a static char array
88          * isn't guaranteed to have any particular alignment, whereas
89          * palloc_extended() will provide MAXALIGN'd storage.
90          */
91         state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
92                                                                                           MCXT_ALLOC_NO_OOM);
93         if (!state->readBuf)
94         {
95                 pfree(state);
96                 return NULL;
97         }
98
99         /* Initialize segment info. */
100         WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
101                                            waldir);
102
103         state->read_page = pagereadfunc;
104         /* system_identifier initialized to zeroes above */
105         state->private_data = private_data;
106         /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
107         state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
108                                                                                   MCXT_ALLOC_NO_OOM);
109         if (!state->errormsg_buf)
110         {
111                 pfree(state->readBuf);
112                 pfree(state);
113                 return NULL;
114         }
115         state->errormsg_buf[0] = '\0';
116
117         /*
118          * Allocate an initial readRecordBuf of minimal size, which can later be
119          * enlarged if necessary.
120          */
121         if (!allocate_recordbuf(state, 0))
122         {
123                 pfree(state->errormsg_buf);
124                 pfree(state->readBuf);
125                 pfree(state);
126                 return NULL;
127         }
128
129         return state;
130 }
131
132 void
133 XLogReaderFree(XLogReaderState *state)
134 {
135         int                     block_id;
136
137         for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
138         {
139                 if (state->blocks[block_id].data)
140                         pfree(state->blocks[block_id].data);
141         }
142         if (state->main_data)
143                 pfree(state->main_data);
144
145         pfree(state->errormsg_buf);
146         if (state->readRecordBuf)
147                 pfree(state->readRecordBuf);
148         pfree(state->readBuf);
149         pfree(state);
150 }
151
152 /*
153  * Allocate readRecordBuf to fit a record of at least the given length.
154  * Returns true if successful, false if out of memory.
155  *
156  * readRecordBufSize is set to the new buffer size.
157  *
158  * To avoid useless small increases, round its size to a multiple of
159  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
160  * with.  (That is enough for all "normal" records, but very large commit or
161  * abort records might need more space.)
162  */
163 static bool
164 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
165 {
166         uint32          newSize = reclength;
167
168         newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
169         newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
170
171 #ifndef FRONTEND
172
173         /*
174          * Note that in much unlucky circumstances, the random data read from a
175          * recycled segment can cause this routine to be called with a size
176          * causing a hard failure at allocation.  For a standby, this would cause
177          * the instance to stop suddenly with a hard failure, preventing it to
178          * retry fetching WAL from one of its sources which could allow it to move
179          * on with replay without a manual restart. If the data comes from a past
180          * recycled segment and is still valid, then the allocation may succeed
181          * but record checks are going to fail so this would be short-lived.  If
182          * the allocation fails because of a memory shortage, then this is not a
183          * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
184          */
185         if (!AllocSizeIsValid(newSize))
186                 return false;
187
188 #endif
189
190         if (state->readRecordBuf)
191                 pfree(state->readRecordBuf);
192         state->readRecordBuf =
193                 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
194         if (state->readRecordBuf == NULL)
195         {
196                 state->readRecordBufSize = 0;
197                 return false;
198         }
199         state->readRecordBufSize = newSize;
200         return true;
201 }
202
203 /*
204  * Initialize the passed segment structs.
205  */
206 void
207 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
208                                    int segsize, const char *waldir)
209 {
210         seg->ws_file = -1;
211         seg->ws_segno = 0;
212         seg->ws_off = 0;
213         seg->ws_tli = 0;
214
215         segcxt->ws_segsize = segsize;
216         if (waldir)
217                 snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
218 }
219
220 /*
221  * Attempt to read an XLOG record.
222  *
223  * If RecPtr is valid, try to read a record at that position.  Otherwise
224  * try to read a record just after the last one previously read.
225  *
226  * If the read_page callback fails to read the requested data, NULL is
227  * returned.  The callback is expected to have reported the error; errormsg
228  * is set to NULL.
229  *
230  * If the reading fails for some other reason, NULL is also returned, and
231  * *errormsg is set to a string with details of the failure.
232  *
233  * The returned pointer (or *errormsg) points to an internal buffer that's
234  * valid until the next call to XLogReadRecord.
235  */
236 XLogRecord *
237 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
238 {
239         XLogRecord *record;
240         XLogRecPtr      targetPagePtr;
241         bool            randAccess;
242         uint32          len,
243                                 total_len;
244         uint32          targetRecOff;
245         uint32          pageHeaderSize;
246         bool            gotheader;
247         int                     readOff;
248
249         /*
250          * randAccess indicates whether to verify the previous-record pointer of
251          * the record we're reading.  We only do this if we're reading
252          * sequentially, which is what we initially assume.
253          */
254         randAccess = false;
255
256         /* reset error state */
257         *errormsg = NULL;
258         state->errormsg_buf[0] = '\0';
259
260         ResetDecoder(state);
261
262         if (RecPtr == InvalidXLogRecPtr)
263         {
264                 /* No explicit start point; read the record after the one we just read */
265                 RecPtr = state->EndRecPtr;
266
267                 if (state->ReadRecPtr == InvalidXLogRecPtr)
268                         randAccess = true;
269
270                 /*
271                  * RecPtr is pointing to end+1 of the previous WAL record.  If we're
272                  * at a page boundary, no more records can fit on the current page. We
273                  * must skip over the page header, but we can't do that until we've
274                  * read in the page, since the header size is variable.
275                  */
276         }
277         else
278         {
279                 /*
280                  * Caller supplied a position to start at.
281                  *
282                  * In this case, the passed-in record pointer should already be
283                  * pointing to a valid record starting position.
284                  */
285                 Assert(XRecOffIsValid(RecPtr));
286                 randAccess = true;
287         }
288
289         state->currRecPtr = RecPtr;
290
291         targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
292         targetRecOff = RecPtr % XLOG_BLCKSZ;
293
294         /*
295          * Read the page containing the record into state->readBuf. Request enough
296          * byte to cover the whole record header, or at least the part of it that
297          * fits on the same page.
298          */
299         readOff = ReadPageInternal(state,
300                                                            targetPagePtr,
301                                                            Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
302         if (readOff < 0)
303                 goto err;
304
305         /*
306          * ReadPageInternal always returns at least the page header, so we can
307          * examine it now.
308          */
309         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
310         if (targetRecOff == 0)
311         {
312                 /*
313                  * At page start, so skip over page header.
314                  */
315                 RecPtr += pageHeaderSize;
316                 targetRecOff = pageHeaderSize;
317         }
318         else if (targetRecOff < pageHeaderSize)
319         {
320                 report_invalid_record(state, "invalid record offset at %X/%X",
321                                                           (uint32) (RecPtr >> 32), (uint32) RecPtr);
322                 goto err;
323         }
324
325         if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
326                 targetRecOff == pageHeaderSize)
327         {
328                 report_invalid_record(state, "contrecord is requested by %X/%X",
329                                                           (uint32) (RecPtr >> 32), (uint32) RecPtr);
330                 goto err;
331         }
332
333         /* ReadPageInternal has verified the page header */
334         Assert(pageHeaderSize <= readOff);
335
336         /*
337          * Read the record length.
338          *
339          * NB: Even though we use an XLogRecord pointer here, the whole record
340          * header might not fit on this page. xl_tot_len is the first field of the
341          * struct, so it must be on this page (the records are MAXALIGNed), but we
342          * cannot access any other fields until we've verified that we got the
343          * whole header.
344          */
345         record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
346         total_len = record->xl_tot_len;
347
348         /*
349          * If the whole record header is on this page, validate it immediately.
350          * Otherwise do just a basic sanity check on xl_tot_len, and validate the
351          * rest of the header after reading it from the next page.  The xl_tot_len
352          * check is necessary here to ensure that we enter the "Need to reassemble
353          * record" code path below; otherwise we might fail to apply
354          * ValidXLogRecordHeader at all.
355          */
356         if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
357         {
358                 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
359                                                                    randAccess))
360                         goto err;
361                 gotheader = true;
362         }
363         else
364         {
365                 /* XXX: more validation should be done here */
366                 if (total_len < SizeOfXLogRecord)
367                 {
368                         report_invalid_record(state,
369                                                                   "invalid record length at %X/%X: wanted %u, got %u",
370                                                                   (uint32) (RecPtr >> 32), (uint32) RecPtr,
371                                                                   (uint32) SizeOfXLogRecord, total_len);
372                         goto err;
373                 }
374                 gotheader = false;
375         }
376
377         len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
378         if (total_len > len)
379         {
380                 /* Need to reassemble record */
381                 char       *contdata;
382                 XLogPageHeader pageHeader;
383                 char       *buffer;
384                 uint32          gotlen;
385
386                 /*
387                  * Enlarge readRecordBuf as needed.
388                  */
389                 if (total_len > state->readRecordBufSize &&
390                         !allocate_recordbuf(state, total_len))
391                 {
392                         /* We treat this as a "bogus data" condition */
393                         report_invalid_record(state, "record length %u at %X/%X too long",
394                                                                   total_len,
395                                                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
396                         goto err;
397                 }
398
399                 /* Copy the first fragment of the record from the first page. */
400                 memcpy(state->readRecordBuf,
401                            state->readBuf + RecPtr % XLOG_BLCKSZ, len);
402                 buffer = state->readRecordBuf + len;
403                 gotlen = len;
404
405                 do
406                 {
407                         /* Calculate pointer to beginning of next page */
408                         targetPagePtr += XLOG_BLCKSZ;
409
410                         /* Wait for the next page to become available */
411                         readOff = ReadPageInternal(state, targetPagePtr,
412                                                                            Min(total_len - gotlen + SizeOfXLogShortPHD,
413                                                                                    XLOG_BLCKSZ));
414
415                         if (readOff < 0)
416                                 goto err;
417
418                         Assert(SizeOfXLogShortPHD <= readOff);
419
420                         /* Check that the continuation on next page looks valid */
421                         pageHeader = (XLogPageHeader) state->readBuf;
422                         if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
423                         {
424                                 report_invalid_record(state,
425                                                                           "there is no contrecord flag at %X/%X",
426                                                                           (uint32) (RecPtr >> 32), (uint32) RecPtr);
427                                 goto err;
428                         }
429
430                         /*
431                          * Cross-check that xlp_rem_len agrees with how much of the record
432                          * we expect there to be left.
433                          */
434                         if (pageHeader->xlp_rem_len == 0 ||
435                                 total_len != (pageHeader->xlp_rem_len + gotlen))
436                         {
437                                 report_invalid_record(state,
438                                                                           "invalid contrecord length %u at %X/%X",
439                                                                           pageHeader->xlp_rem_len,
440                                                                           (uint32) (RecPtr >> 32), (uint32) RecPtr);
441                                 goto err;
442                         }
443
444                         /* Append the continuation from this page to the buffer */
445                         pageHeaderSize = XLogPageHeaderSize(pageHeader);
446
447                         if (readOff < pageHeaderSize)
448                                 readOff = ReadPageInternal(state, targetPagePtr,
449                                                                                    pageHeaderSize);
450
451                         Assert(pageHeaderSize <= readOff);
452
453                         contdata = (char *) state->readBuf + pageHeaderSize;
454                         len = XLOG_BLCKSZ - pageHeaderSize;
455                         if (pageHeader->xlp_rem_len < len)
456                                 len = pageHeader->xlp_rem_len;
457
458                         if (readOff < pageHeaderSize + len)
459                                 readOff = ReadPageInternal(state, targetPagePtr,
460                                                                                    pageHeaderSize + len);
461
462                         memcpy(buffer, (char *) contdata, len);
463                         buffer += len;
464                         gotlen += len;
465
466                         /* If we just reassembled the record header, validate it. */
467                         if (!gotheader)
468                         {
469                                 record = (XLogRecord *) state->readRecordBuf;
470                                 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
471                                                                                    record, randAccess))
472                                         goto err;
473                                 gotheader = true;
474                         }
475                 } while (gotlen < total_len);
476
477                 Assert(gotheader);
478
479                 record = (XLogRecord *) state->readRecordBuf;
480                 if (!ValidXLogRecord(state, record, RecPtr))
481                         goto err;
482
483                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
484                 state->ReadRecPtr = RecPtr;
485                 state->EndRecPtr = targetPagePtr + pageHeaderSize
486                         + MAXALIGN(pageHeader->xlp_rem_len);
487         }
488         else
489         {
490                 /* Wait for the record data to become available */
491                 readOff = ReadPageInternal(state, targetPagePtr,
492                                                                    Min(targetRecOff + total_len, XLOG_BLCKSZ));
493                 if (readOff < 0)
494                         goto err;
495
496                 /* Record does not cross a page boundary */
497                 if (!ValidXLogRecord(state, record, RecPtr))
498                         goto err;
499
500                 state->EndRecPtr = RecPtr + MAXALIGN(total_len);
501
502                 state->ReadRecPtr = RecPtr;
503         }
504
505         /*
506          * Special processing if it's an XLOG SWITCH record
507          */
508         if (record->xl_rmid == RM_XLOG_ID &&
509                 (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
510         {
511                 /* Pretend it extends to end of segment */
512                 state->EndRecPtr += state->segcxt.ws_segsize - 1;
513                 state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
514         }
515
516         if (DecodeXLogRecord(state, record, errormsg))
517                 return record;
518         else
519                 return NULL;
520
521 err:
522
523         /*
524          * Invalidate the read state. We might read from a different source after
525          * failure.
526          */
527         XLogReaderInvalReadState(state);
528
529         if (state->errormsg_buf[0] != '\0')
530                 *errormsg = state->errormsg_buf;
531
532         return NULL;
533 }
534
535 /*
536  * Read a single xlog page including at least [pageptr, reqLen] of valid data
537  * via the read_page() callback.
538  *
539  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
540  * is set in that case (unless the error occurs in the read_page callback).
541  *
542  * We fetch the page from a reader-local cache if we know we have the required
543  * data and if there hasn't been any error since caching the data.
544  */
545 static int
546 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
547 {
548         int                     readLen;
549         uint32          targetPageOff;
550         XLogSegNo       targetSegNo;
551         XLogPageHeader hdr;
552
553         Assert((pageptr % XLOG_BLCKSZ) == 0);
554
555         XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
556         targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
557
558         /* check whether we have all the requested data already */
559         if (targetSegNo == state->seg.ws_segno &&
560                 targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
561                 return state->readLen;
562
563         /*
564          * Data is not in our buffer.
565          *
566          * Every time we actually read the page, even if we looked at parts of it
567          * before, we need to do verification as the read_page callback might now
568          * be rereading data from a different source.
569          *
570          * Whenever switching to a new WAL segment, we read the first page of the
571          * file and validate its header, even if that's not where the target
572          * record is.  This is so that we can check the additional identification
573          * info that is present in the first page's "long" header.
574          */
575         if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
576         {
577                 XLogRecPtr      targetSegmentPtr = pageptr - targetPageOff;
578
579                 readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
580                                                                    state->currRecPtr,
581                                                                    state->readBuf);
582                 if (readLen < 0)
583                         goto err;
584
585                 /* we can be sure to have enough WAL available, we scrolled back */
586                 Assert(readLen == XLOG_BLCKSZ);
587
588                 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
589                                                                                   state->readBuf))
590                         goto err;
591         }
592
593         /*
594          * First, read the requested data length, but at least a short page header
595          * so that we can validate it.
596          */
597         readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
598                                                            state->currRecPtr,
599                                                            state->readBuf);
600         if (readLen < 0)
601                 goto err;
602
603         Assert(readLen <= XLOG_BLCKSZ);
604
605         /* Do we have enough data to check the header length? */
606         if (readLen <= SizeOfXLogShortPHD)
607                 goto err;
608
609         Assert(readLen >= reqLen);
610
611         hdr = (XLogPageHeader) state->readBuf;
612
613         /* still not enough */
614         if (readLen < XLogPageHeaderSize(hdr))
615         {
616                 readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
617                                                                    state->currRecPtr,
618                                                                    state->readBuf);
619                 if (readLen < 0)
620                         goto err;
621         }
622
623         /*
624          * Now that we know we have the full header, validate it.
625          */
626         if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
627                 goto err;
628
629         /* update read state information */
630         state->seg.ws_segno = targetSegNo;
631         state->seg.ws_off = targetPageOff;
632         state->readLen = readLen;
633
634         return readLen;
635
636 err:
637         XLogReaderInvalReadState(state);
638         return -1;
639 }
640
641 /*
642  * Invalidate the xlogreader's read state to force a re-read.
643  */
644 static void
645 XLogReaderInvalReadState(XLogReaderState *state)
646 {
647         state->seg.ws_segno = 0;
648         state->seg.ws_off = 0;
649         state->readLen = 0;
650 }
651
652 /*
653  * Validate an XLOG record header.
654  *
655  * This is just a convenience subroutine to avoid duplicated code in
656  * XLogReadRecord.  It's not intended for use from anywhere else.
657  */
658 static bool
659 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
660                                           XLogRecPtr PrevRecPtr, XLogRecord *record,
661                                           bool randAccess)
662 {
663         if (record->xl_tot_len < SizeOfXLogRecord)
664         {
665                 report_invalid_record(state,
666                                                           "invalid record length at %X/%X: wanted %u, got %u",
667                                                           (uint32) (RecPtr >> 32), (uint32) RecPtr,
668                                                           (uint32) SizeOfXLogRecord, record->xl_tot_len);
669                 return false;
670         }
671         if (record->xl_rmid > RM_MAX_ID)
672         {
673                 report_invalid_record(state,
674                                                           "invalid resource manager ID %u at %X/%X",
675                                                           record->xl_rmid, (uint32) (RecPtr >> 32),
676                                                           (uint32) RecPtr);
677                 return false;
678         }
679         if (randAccess)
680         {
681                 /*
682                  * We can't exactly verify the prev-link, but surely it should be less
683                  * than the record's own address.
684                  */
685                 if (!(record->xl_prev < RecPtr))
686                 {
687                         report_invalid_record(state,
688                                                                   "record with incorrect prev-link %X/%X at %X/%X",
689                                                                   (uint32) (record->xl_prev >> 32),
690                                                                   (uint32) record->xl_prev,
691                                                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
692                         return false;
693                 }
694         }
695         else
696         {
697                 /*
698                  * Record's prev-link should exactly match our previous location. This
699                  * check guards against torn WAL pages where a stale but valid-looking
700                  * WAL record starts on a sector boundary.
701                  */
702                 if (record->xl_prev != PrevRecPtr)
703                 {
704                         report_invalid_record(state,
705                                                                   "record with incorrect prev-link %X/%X at %X/%X",
706                                                                   (uint32) (record->xl_prev >> 32),
707                                                                   (uint32) record->xl_prev,
708                                                                   (uint32) (RecPtr >> 32), (uint32) RecPtr);
709                         return false;
710                 }
711         }
712
713         return true;
714 }
715
716
717 /*
718  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
719  * record (other than to the minimal extent of computing the amount of
720  * data to read in) until we've checked the CRCs.
721  *
722  * We assume all of the record (that is, xl_tot_len bytes) has been read
723  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
724  * record's header, which means in particular that xl_tot_len is at least
725  * SizeOfXLogRecord.
726  */
727 static bool
728 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
729 {
730         pg_crc32c       crc;
731
732         /* Calculate the CRC */
733         INIT_CRC32C(crc);
734         COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
735         /* include the record header last */
736         COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
737         FIN_CRC32C(crc);
738
739         if (!EQ_CRC32C(record->xl_crc, crc))
740         {
741                 report_invalid_record(state,
742                                                           "incorrect resource manager data checksum in record at %X/%X",
743                                                           (uint32) (recptr >> 32), (uint32) recptr);
744                 return false;
745         }
746
747         return true;
748 }
749
750 /*
751  * Validate a page header.
752  *
753  * Check if 'phdr' is valid as the header of the XLog page at position
754  * 'recptr'.
755  */
756 bool
757 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
758                                                          char *phdr)
759 {
760         XLogRecPtr      recaddr;
761         XLogSegNo       segno;
762         int32           offset;
763         XLogPageHeader hdr = (XLogPageHeader) phdr;
764
765         Assert((recptr % XLOG_BLCKSZ) == 0);
766
767         XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
768         offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
769
770         XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
771
772         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
773         {
774                 char            fname[MAXFNAMELEN];
775
776                 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
777
778                 report_invalid_record(state,
779                                                           "invalid magic number %04X in log segment %s, offset %u",
780                                                           hdr->xlp_magic,
781                                                           fname,
782                                                           offset);
783                 return false;
784         }
785
786         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
787         {
788                 char            fname[MAXFNAMELEN];
789
790                 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
791
792                 report_invalid_record(state,
793                                                           "invalid info bits %04X in log segment %s, offset %u",
794                                                           hdr->xlp_info,
795                                                           fname,
796                                                           offset);
797                 return false;
798         }
799
800         if (hdr->xlp_info & XLP_LONG_HEADER)
801         {
802                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
803
804                 if (state->system_identifier &&
805                         longhdr->xlp_sysid != state->system_identifier)
806                 {
807                         report_invalid_record(state,
808                                                                   "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
809                                                                   (unsigned long long) longhdr->xlp_sysid,
810                                                                   (unsigned long long) state->system_identifier);
811                         return false;
812                 }
813                 else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
814                 {
815                         report_invalid_record(state,
816                                                                   "WAL file is from different database system: incorrect segment size in page header");
817                         return false;
818                 }
819                 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
820                 {
821                         report_invalid_record(state,
822                                                                   "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
823                         return false;
824                 }
825         }
826         else if (offset == 0)
827         {
828                 char            fname[MAXFNAMELEN];
829
830                 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
831
832                 /* hmm, first page of file doesn't have a long header? */
833                 report_invalid_record(state,
834                                                           "invalid info bits %04X in log segment %s, offset %u",
835                                                           hdr->xlp_info,
836                                                           fname,
837                                                           offset);
838                 return false;
839         }
840
841         /*
842          * Check that the address on the page agrees with what we expected. This
843          * check typically fails when an old WAL segment is recycled, and hasn't
844          * yet been overwritten with new data yet.
845          */
846         if (hdr->xlp_pageaddr != recaddr)
847         {
848                 char            fname[MAXFNAMELEN];
849
850                 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
851
852                 report_invalid_record(state,
853                                                           "unexpected pageaddr %X/%X in log segment %s, offset %u",
854                                                           (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
855                                                           fname,
856                                                           offset);
857                 return false;
858         }
859
860         /*
861          * Since child timelines are always assigned a TLI greater than their
862          * immediate parent's TLI, we should never see TLI go backwards across
863          * successive pages of a consistent WAL sequence.
864          *
865          * Sometimes we re-read a segment that's already been (partially) read. So
866          * we only verify TLIs for pages that are later than the last remembered
867          * LSN.
868          */
869         if (recptr > state->latestPagePtr)
870         {
871                 if (hdr->xlp_tli < state->latestPageTLI)
872                 {
873                         char            fname[MAXFNAMELEN];
874
875                         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
876
877                         report_invalid_record(state,
878                                                                   "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
879                                                                   hdr->xlp_tli,
880                                                                   state->latestPageTLI,
881                                                                   fname,
882                                                                   offset);
883                         return false;
884                 }
885         }
886         state->latestPagePtr = recptr;
887         state->latestPageTLI = hdr->xlp_tli;
888
889         return true;
890 }
891
892 #ifdef FRONTEND
893 /*
894  * Functions that are currently not needed in the backend, but are better
895  * implemented inside xlogreader.c because of the internal facilities available
896  * here.
897  */
898
899 /*
900  * Find the first record with an lsn >= RecPtr.
901  *
902  * Useful for checking whether RecPtr is a valid xlog address for reading, and
903  * to find the first valid address after some address when dumping records for
904  * debugging purposes.
905  */
906 XLogRecPtr
907 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
908 {
909         XLogReaderState saved_state = *state;
910         XLogRecPtr      tmpRecPtr;
911         XLogRecPtr      found = InvalidXLogRecPtr;
912         XLogPageHeader header;
913         char       *errormsg;
914
915         Assert(!XLogRecPtrIsInvalid(RecPtr));
916
917         /*
918          * skip over potential continuation data, keeping in mind that it may span
919          * multiple pages
920          */
921         tmpRecPtr = RecPtr;
922         while (true)
923         {
924                 XLogRecPtr      targetPagePtr;
925                 int                     targetRecOff;
926                 uint32          pageHeaderSize;
927                 int                     readLen;
928
929                 /*
930                  * Compute targetRecOff. It should typically be equal or greater than
931                  * short page-header since a valid record can't start anywhere before
932                  * that, except when caller has explicitly specified the offset that
933                  * falls somewhere there or when we are skipping multi-page
934                  * continuation record. It doesn't matter though because
935                  * ReadPageInternal() is prepared to handle that and will read at
936                  * least short page-header worth of data
937                  */
938                 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
939
940                 /* scroll back to page boundary */
941                 targetPagePtr = tmpRecPtr - targetRecOff;
942
943                 /* Read the page containing the record */
944                 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
945                 if (readLen < 0)
946                         goto err;
947
948                 header = (XLogPageHeader) state->readBuf;
949
950                 pageHeaderSize = XLogPageHeaderSize(header);
951
952                 /* make sure we have enough data for the page header */
953                 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
954                 if (readLen < 0)
955                         goto err;
956
957                 /* skip over potential continuation data */
958                 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
959                 {
960                         /*
961                          * If the length of the remaining continuation data is more than
962                          * what can fit in this page, the continuation record crosses over
963                          * this page. Read the next page and try again. xlp_rem_len in the
964                          * next page header will contain the remaining length of the
965                          * continuation data
966                          *
967                          * Note that record headers are MAXALIGN'ed
968                          */
969                         if (MAXALIGN(header->xlp_rem_len) > (XLOG_BLCKSZ - pageHeaderSize))
970                                 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
971                         else
972                         {
973                                 /*
974                                  * The previous continuation record ends in this page. Set
975                                  * tmpRecPtr to point to the first valid record
976                                  */
977                                 tmpRecPtr = targetPagePtr + pageHeaderSize
978                                         + MAXALIGN(header->xlp_rem_len);
979                                 break;
980                         }
981                 }
982                 else
983                 {
984                         tmpRecPtr = targetPagePtr + pageHeaderSize;
985                         break;
986                 }
987         }
988
989         /*
990          * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
991          * because either we're at the first record after the beginning of a page
992          * or we just jumped over the remaining data of a continuation.
993          */
994         while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
995         {
996                 /* continue after the record */
997                 tmpRecPtr = InvalidXLogRecPtr;
998
999                 /* past the record we've found, break out */
1000                 if (RecPtr <= state->ReadRecPtr)
1001                 {
1002                         found = state->ReadRecPtr;
1003                         goto out;
1004                 }
1005         }
1006
1007 err:
1008 out:
1009         /* Reset state to what we had before finding the record */
1010         state->ReadRecPtr = saved_state.ReadRecPtr;
1011         state->EndRecPtr = saved_state.EndRecPtr;
1012         XLogReaderInvalReadState(state);
1013
1014         return found;
1015 }
1016
1017 #endif                                                  /* FRONTEND */
1018
1019 /* ----------------------------------------
1020  * Functions for decoding the data and block references in a record.
1021  * ----------------------------------------
1022  */
1023
1024 /* private function to reset the state between records */
1025 static void
1026 ResetDecoder(XLogReaderState *state)
1027 {
1028         int                     block_id;
1029
1030         state->decoded_record = NULL;
1031
1032         state->main_data_len = 0;
1033
1034         for (block_id = 0; block_id <= state->max_block_id; block_id++)
1035         {
1036                 state->blocks[block_id].in_use = false;
1037                 state->blocks[block_id].has_image = false;
1038                 state->blocks[block_id].has_data = false;
1039                 state->blocks[block_id].apply_image = false;
1040         }
1041         state->max_block_id = -1;
1042 }
1043
1044 /*
1045  * Decode the previously read record.
1046  *
1047  * On error, a human-readable error message is returned in *errormsg, and
1048  * the return value is false.
1049  */
1050 bool
1051 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1052 {
1053         /*
1054          * read next _size bytes from record buffer, but check for overrun first.
1055          */
1056 #define COPY_HEADER_FIELD(_dst, _size)                  \
1057         do {                                                                            \
1058                 if (remaining < _size)                                  \
1059                         goto shortdata_err;                                     \
1060                 memcpy(_dst, ptr, _size);                               \
1061                 ptr += _size;                                                   \
1062                 remaining -= _size;                                             \
1063         } while(0)
1064
1065         char       *ptr;
1066         uint32          remaining;
1067         uint32          datatotal;
1068         RelFileNode *rnode = NULL;
1069         uint8           block_id;
1070
1071         ResetDecoder(state);
1072
1073         state->decoded_record = record;
1074         state->record_origin = InvalidRepOriginId;
1075
1076         ptr = (char *) record;
1077         ptr += SizeOfXLogRecord;
1078         remaining = record->xl_tot_len - SizeOfXLogRecord;
1079
1080         /* Decode the headers */
1081         datatotal = 0;
1082         while (remaining > datatotal)
1083         {
1084                 COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1085
1086                 if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1087                 {
1088                         /* XLogRecordDataHeaderShort */
1089                         uint8           main_data_len;
1090
1091                         COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1092
1093                         state->main_data_len = main_data_len;
1094                         datatotal += main_data_len;
1095                         break;                          /* by convention, the main data fragment is
1096                                                                  * always last */
1097                 }
1098                 else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1099                 {
1100                         /* XLogRecordDataHeaderLong */
1101                         uint32          main_data_len;
1102
1103                         COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1104                         state->main_data_len = main_data_len;
1105                         datatotal += main_data_len;
1106                         break;                          /* by convention, the main data fragment is
1107                                                                  * always last */
1108                 }
1109                 else if (block_id == XLR_BLOCK_ID_ORIGIN)
1110                 {
1111                         COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1112                 }
1113                 else if (block_id <= XLR_MAX_BLOCK_ID)
1114                 {
1115                         /* XLogRecordBlockHeader */
1116                         DecodedBkpBlock *blk;
1117                         uint8           fork_flags;
1118
1119                         if (block_id <= state->max_block_id)
1120                         {
1121                                 report_invalid_record(state,
1122                                                                           "out-of-order block_id %u at %X/%X",
1123                                                                           block_id,
1124                                                                           (uint32) (state->ReadRecPtr >> 32),
1125                                                                           (uint32) state->ReadRecPtr);
1126                                 goto err;
1127                         }
1128                         state->max_block_id = block_id;
1129
1130                         blk = &state->blocks[block_id];
1131                         blk->in_use = true;
1132                         blk->apply_image = false;
1133
1134                         COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1135                         blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1136                         blk->flags = fork_flags;
1137                         blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1138                         blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1139
1140                         COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1141                         /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1142                         if (blk->has_data && blk->data_len == 0)
1143                         {
1144                                 report_invalid_record(state,
1145                                                                           "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1146                                                                           (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1147                                 goto err;
1148                         }
1149                         if (!blk->has_data && blk->data_len != 0)
1150                         {
1151                                 report_invalid_record(state,
1152                                                                           "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1153                                                                           (unsigned int) blk->data_len,
1154                                                                           (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1155                                 goto err;
1156                         }
1157                         datatotal += blk->data_len;
1158
1159                         if (blk->has_image)
1160                         {
1161                                 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1162                                 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1163                                 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1164
1165                                 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1166
1167                                 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1168                                 {
1169                                         if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1170                                                 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1171                                         else
1172                                                 blk->hole_length = 0;
1173                                 }
1174                                 else
1175                                         blk->hole_length = BLCKSZ - blk->bimg_len;
1176                                 datatotal += blk->bimg_len;
1177
1178                                 /*
1179                                  * cross-check that hole_offset > 0, hole_length > 0 and
1180                                  * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1181                                  */
1182                                 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1183                                         (blk->hole_offset == 0 ||
1184                                          blk->hole_length == 0 ||
1185                                          blk->bimg_len == BLCKSZ))
1186                                 {
1187                                         report_invalid_record(state,
1188                                                                                   "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1189                                                                                   (unsigned int) blk->hole_offset,
1190                                                                                   (unsigned int) blk->hole_length,
1191                                                                                   (unsigned int) blk->bimg_len,
1192                                                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1193                                         goto err;
1194                                 }
1195
1196                                 /*
1197                                  * cross-check that hole_offset == 0 and hole_length == 0 if
1198                                  * the HAS_HOLE flag is not set.
1199                                  */
1200                                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1201                                         (blk->hole_offset != 0 || blk->hole_length != 0))
1202                                 {
1203                                         report_invalid_record(state,
1204                                                                                   "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1205                                                                                   (unsigned int) blk->hole_offset,
1206                                                                                   (unsigned int) blk->hole_length,
1207                                                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1208                                         goto err;
1209                                 }
1210
1211                                 /*
1212                                  * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1213                                  * flag is set.
1214                                  */
1215                                 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1216                                         blk->bimg_len == BLCKSZ)
1217                                 {
1218                                         report_invalid_record(state,
1219                                                                                   "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1220                                                                                   (unsigned int) blk->bimg_len,
1221                                                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1222                                         goto err;
1223                                 }
1224
1225                                 /*
1226                                  * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1227                                  * IS_COMPRESSED flag is set.
1228                                  */
1229                                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1230                                         !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1231                                         blk->bimg_len != BLCKSZ)
1232                                 {
1233                                         report_invalid_record(state,
1234                                                                                   "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1235                                                                                   (unsigned int) blk->data_len,
1236                                                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1237                                         goto err;
1238                                 }
1239                         }
1240                         if (!(fork_flags & BKPBLOCK_SAME_REL))
1241                         {
1242                                 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1243                                 rnode = &blk->rnode;
1244                         }
1245                         else
1246                         {
1247                                 if (rnode == NULL)
1248                                 {
1249                                         report_invalid_record(state,
1250                                                                                   "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1251                                                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1252                                         goto err;
1253                                 }
1254
1255                                 blk->rnode = *rnode;
1256                         }
1257                         COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1258                 }
1259                 else
1260                 {
1261                         report_invalid_record(state,
1262                                                                   "invalid block_id %u at %X/%X",
1263                                                                   block_id,
1264                                                                   (uint32) (state->ReadRecPtr >> 32),
1265                                                                   (uint32) state->ReadRecPtr);
1266                         goto err;
1267                 }
1268         }
1269
1270         if (remaining != datatotal)
1271                 goto shortdata_err;
1272
1273         /*
1274          * Ok, we've parsed the fragment headers, and verified that the total
1275          * length of the payload in the fragments is equal to the amount of data
1276          * left. Copy the data of each fragment to a separate buffer.
1277          *
1278          * We could just set up pointers into readRecordBuf, but we want to align
1279          * the data for the convenience of the callers. Backup images are not
1280          * copied, however; they don't need alignment.
1281          */
1282
1283         /* block data first */
1284         for (block_id = 0; block_id <= state->max_block_id; block_id++)
1285         {
1286                 DecodedBkpBlock *blk = &state->blocks[block_id];
1287
1288                 if (!blk->in_use)
1289                         continue;
1290
1291                 Assert(blk->has_image || !blk->apply_image);
1292
1293                 if (blk->has_image)
1294                 {
1295                         blk->bkp_image = ptr;
1296                         ptr += blk->bimg_len;
1297                 }
1298                 if (blk->has_data)
1299                 {
1300                         if (!blk->data || blk->data_len > blk->data_bufsz)
1301                         {
1302                                 if (blk->data)
1303                                         pfree(blk->data);
1304
1305                                 /*
1306                                  * Force the initial request to be BLCKSZ so that we don't
1307                                  * waste time with lots of trips through this stanza as a
1308                                  * result of WAL compression.
1309                                  */
1310                                 blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1311                                 blk->data = palloc(blk->data_bufsz);
1312                         }
1313                         memcpy(blk->data, ptr, blk->data_len);
1314                         ptr += blk->data_len;
1315                 }
1316         }
1317
1318         /* and finally, the main data */
1319         if (state->main_data_len > 0)
1320         {
1321                 if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1322                 {
1323                         if (state->main_data)
1324                                 pfree(state->main_data);
1325
1326                         /*
1327                          * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1328                          * types, we omit trailing struct padding on-disk to save a few
1329                          * bytes; but compilers may generate accesses to the xlog struct
1330                          * that assume that padding bytes are present.  If the palloc
1331                          * request is not large enough to include such padding bytes then
1332                          * we'll get valgrind complaints due to otherwise-harmless fetches
1333                          * of the padding bytes.
1334                          *
1335                          * In addition, force the initial request to be reasonably large
1336                          * so that we don't waste time with lots of trips through this
1337                          * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1338                          */
1339                         state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1340                                                                                                   BLCKSZ / 2));
1341                         state->main_data = palloc(state->main_data_bufsz);
1342                 }
1343                 memcpy(state->main_data, ptr, state->main_data_len);
1344                 ptr += state->main_data_len;
1345         }
1346
1347         return true;
1348
1349 shortdata_err:
1350         report_invalid_record(state,
1351                                                   "record with invalid length at %X/%X",
1352                                                   (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1353 err:
1354         *errormsg = state->errormsg_buf;
1355
1356         return false;
1357 }
1358
1359 /*
1360  * Returns information about the block that a block reference refers to.
1361  *
1362  * If the WAL record contains a block reference with the given ID, *rnode,
1363  * *forknum, and *blknum are filled in (if not NULL), and returns true.
1364  * Otherwise returns false.
1365  */
1366 bool
1367 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1368                                    RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1369 {
1370         DecodedBkpBlock *bkpb;
1371
1372         if (!record->blocks[block_id].in_use)
1373                 return false;
1374
1375         bkpb = &record->blocks[block_id];
1376         if (rnode)
1377                 *rnode = bkpb->rnode;
1378         if (forknum)
1379                 *forknum = bkpb->forknum;
1380         if (blknum)
1381                 *blknum = bkpb->blkno;
1382         return true;
1383 }
1384
1385 /*
1386  * Returns the data associated with a block reference, or NULL if there is
1387  * no data (e.g. because a full-page image was taken instead). The returned
1388  * pointer points to a MAXALIGNed buffer.
1389  */
1390 char *
1391 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1392 {
1393         DecodedBkpBlock *bkpb;
1394
1395         if (!record->blocks[block_id].in_use)
1396                 return NULL;
1397
1398         bkpb = &record->blocks[block_id];
1399
1400         if (!bkpb->has_data)
1401         {
1402                 if (len)
1403                         *len = 0;
1404                 return NULL;
1405         }
1406         else
1407         {
1408                 if (len)
1409                         *len = bkpb->data_len;
1410                 return bkpb->data;
1411         }
1412 }
1413
1414 /*
1415  * Restore a full-page image from a backup block attached to an XLOG record.
1416  *
1417  * Returns the buffer number containing the page.
1418  */
1419 bool
1420 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1421 {
1422         DecodedBkpBlock *bkpb;
1423         char       *ptr;
1424         PGAlignedBlock tmp;
1425
1426         if (!record->blocks[block_id].in_use)
1427                 return false;
1428         if (!record->blocks[block_id].has_image)
1429                 return false;
1430
1431         bkpb = &record->blocks[block_id];
1432         ptr = bkpb->bkp_image;
1433
1434         if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1435         {
1436                 /* If a backup block image is compressed, decompress it */
1437                 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1438                                                         BLCKSZ - bkpb->hole_length, true) < 0)
1439                 {
1440                         report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1441                                                                   (uint32) (record->ReadRecPtr >> 32),
1442                                                                   (uint32) record->ReadRecPtr,
1443                                                                   block_id);
1444                         return false;
1445                 }
1446                 ptr = tmp.data;
1447         }
1448
1449         /* generate page, taking into account hole if necessary */
1450         if (bkpb->hole_length == 0)
1451         {
1452                 memcpy(page, ptr, BLCKSZ);
1453         }
1454         else
1455         {
1456                 memcpy(page, ptr, bkpb->hole_offset);
1457                 /* must zero-fill the hole */
1458                 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1459                 memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1460                            ptr + bkpb->hole_offset,
1461                            BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1462         }
1463
1464         return true;
1465 }
1466
1467 #ifndef FRONTEND
1468
1469 /*
1470  * Extract the FullTransactionId from a WAL record.
1471  */
1472 FullTransactionId
1473 XLogRecGetFullXid(XLogReaderState *record)
1474 {
1475         TransactionId   xid,
1476                                         next_xid;
1477         uint32                  epoch;
1478
1479         /*
1480          * This function is only safe during replay, because it depends on the
1481          * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
1482          */
1483         Assert(AmStartupProcess() || !IsUnderPostmaster);
1484
1485         xid = XLogRecGetXid(record);
1486         next_xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
1487         epoch = EpochFromFullTransactionId(ShmemVariableCache->nextFullXid);
1488
1489         /*
1490          * If xid is numerically greater than next_xid, it has to be from the
1491          * last epoch.
1492          */
1493         if (unlikely(xid > next_xid))
1494                 --epoch;
1495
1496         return FullTransactionIdFromEpochAndXid(epoch, xid);
1497 }
1498
1499 #endif