]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xloginsert.c
03b05f937f024db731dad68fbabcc50ed4a2d669
[postgresql] / src / backend / access / transam / xloginsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *              Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19
20 #include "postgres.h"
21
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "replication/origin.h"
30 #include "storage/bufmgr.h"
31 #include "storage/proc.h"
32 #include "utils/memutils.h"
33 #include "pg_trace.h"
34
35 /* Buffer size required to store a compressed version of backup block image */
36 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
37
38 /*
39  * For each block reference registered with XLogRegisterBuffer, we fill in
40  * a registered_buffer struct.
41  */
42 typedef struct
43 {
44         bool            in_use;                 /* is this slot in use? */
45         uint8           flags;                  /* REGBUF_* flags */
46         RelFileNode rnode;                      /* identifies the relation and block */
47         ForkNumber      forkno;
48         BlockNumber block;
49         Page            page;                   /* page content */
50         uint32          rdata_len;              /* total length of data in rdata chain */
51         XLogRecData *rdata_head;        /* head of the chain of data registered with
52                                                                  * this block */
53         XLogRecData *rdata_tail;        /* last entry in the chain, or &rdata_head if
54                                                                  * empty */
55
56         XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
57                                                                  * backup block data in XLogRecordAssemble() */
58
59         /* buffer to store a compressed version of backup block image */
60         char            compressed_page[PGLZ_MAX_BLCKSZ];
61 } registered_buffer;
62
63 static registered_buffer *registered_buffers;
64 static int      max_registered_buffers;         /* allocated size */
65 static int      max_registered_block_id = 0;            /* highest block_id + 1
66                                                                                                  * currently registered */
67
68 /*
69  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
70  * with XLogRegisterData(...).
71  */
72 static XLogRecData *mainrdata_head;
73 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
74 static uint32 mainrdata_len;    /* total # of bytes in chain */
75
76 /* flags for the in-progress insertion */
77 static uint8 curinsert_flags = 0;
78
79 /*
80  * These are used to hold the record header while constructing a record.
81  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
82  * because we want it to be MAXALIGNed and padding bytes zeroed.
83  *
84  * For simplicity, it's allocated large enough to hold the headers for any
85  * WAL record.
86  */
87 static XLogRecData hdr_rdt;
88 static char *hdr_scratch = NULL;
89
90 #define SizeOfXlogOrigin        (sizeof(RepOriginId) + sizeof(char))
91
92 #define HEADER_SCRATCH_SIZE \
93         (SizeOfXLogRecord + \
94          MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
95          SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
96
97 /*
98  * An array of XLogRecData structs, to hold registered data.
99  */
100 static XLogRecData *rdatas;
101 static int      num_rdatas;                     /* entries currently used */
102 static int      max_rdatas;                     /* allocated size */
103
104 static bool begininsert_called = false;
105
106 /* Memory context to hold the registered buffer and data references. */
107 static MemoryContext xloginsert_cxt;
108
109 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
110                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
111                                    XLogRecPtr *fpw_lsn);
112 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
113                                                 uint16 hole_length, char *dest, uint16 *dlen);
114
115 /*
116  * Begin constructing a WAL record. This must be called before the
117  * XLogRegister* functions and XLogInsert().
118  */
119 void
120 XLogBeginInsert(void)
121 {
122         Assert(max_registered_block_id == 0);
123         Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
124         Assert(mainrdata_len == 0);
125
126         /* cross-check on whether we should be here or not */
127         if (!XLogInsertAllowed())
128                 elog(ERROR, "cannot make new WAL entries during recovery");
129
130         if (begininsert_called)
131                 elog(ERROR, "XLogBeginInsert was already called");
132
133         begininsert_called = true;
134 }
135
136 /*
137  * Ensure that there are enough buffer and data slots in the working area,
138  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
139  * calls.
140  *
141  * There is always space for a small number of buffers and data chunks, enough
142  * for most record types. This function is for the exceptional cases that need
143  * more.
144  */
145 void
146 XLogEnsureRecordSpace(int max_block_id, int ndatas)
147 {
148         int                     nbuffers;
149
150         /*
151          * This must be called before entering a critical section, because
152          * allocating memory inside a critical section can fail. repalloc() will
153          * check the same, but better to check it here too so that we fail
154          * consistently even if the arrays happen to be large enough already.
155          */
156         Assert(CritSectionCount == 0);
157
158         /* the minimum values can't be decreased */
159         if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
160                 max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
161         if (ndatas < XLR_NORMAL_RDATAS)
162                 ndatas = XLR_NORMAL_RDATAS;
163
164         if (max_block_id > XLR_MAX_BLOCK_ID)
165                 elog(ERROR, "maximum number of WAL record block references exceeded");
166         nbuffers = max_block_id + 1;
167
168         if (nbuffers > max_registered_buffers)
169         {
170                 registered_buffers = (registered_buffer *)
171                         repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
172
173                 /*
174                  * At least the padding bytes in the structs must be zeroed, because
175                  * they are included in WAL data, but initialize it all for tidiness.
176                  */
177                 MemSet(&registered_buffers[max_registered_buffers], 0,
178                         (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
179                 max_registered_buffers = nbuffers;
180         }
181
182         if (ndatas > max_rdatas)
183         {
184                 rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
185                 max_rdatas = ndatas;
186         }
187 }
188
189 /*
190  * Reset WAL record construction buffers.
191  */
192 void
193 XLogResetInsertion(void)
194 {
195         int                     i;
196
197         for (i = 0; i < max_registered_block_id; i++)
198                 registered_buffers[i].in_use = false;
199
200         num_rdatas = 0;
201         max_registered_block_id = 0;
202         mainrdata_len = 0;
203         mainrdata_last = (XLogRecData *) &mainrdata_head;
204         curinsert_flags = 0;
205         begininsert_called = false;
206 }
207
208 /*
209  * Register a reference to a buffer with the WAL record being constructed.
210  * This must be called for every page that the WAL-logged operation modifies.
211  */
212 void
213 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
214 {
215         registered_buffer *regbuf;
216
217         /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
218         Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
219         Assert(begininsert_called);
220
221         if (block_id >= max_registered_block_id)
222         {
223                 if (block_id >= max_registered_buffers)
224                         elog(ERROR, "too many registered buffers");
225                 max_registered_block_id = block_id + 1;
226         }
227
228         regbuf = &registered_buffers[block_id];
229
230         BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
231         regbuf->page = BufferGetPage(buffer);
232         regbuf->flags = flags;
233         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
234         regbuf->rdata_len = 0;
235
236         /*
237          * Check that this page hasn't already been registered with some other
238          * block_id.
239          */
240 #ifdef USE_ASSERT_CHECKING
241         {
242                 int                     i;
243
244                 for (i = 0; i < max_registered_block_id; i++)
245                 {
246                         registered_buffer *regbuf_old = &registered_buffers[i];
247
248                         if (i == block_id || !regbuf_old->in_use)
249                                 continue;
250
251                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
252                                    regbuf_old->forkno != regbuf->forkno ||
253                                    regbuf_old->block != regbuf->block);
254                 }
255         }
256 #endif
257
258         regbuf->in_use = true;
259 }
260
261 /*
262  * Like XLogRegisterBuffer, but for registering a block that's not in the
263  * shared buffer pool (i.e. when you don't have a Buffer for it).
264  */
265 void
266 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
267                                   BlockNumber blknum, Page page, uint8 flags)
268 {
269         registered_buffer *regbuf;
270
271         /* This is currently only used to WAL-log a full-page image of a page */
272         Assert(flags & REGBUF_FORCE_IMAGE);
273         Assert(begininsert_called);
274
275         if (block_id >= max_registered_block_id)
276                 max_registered_block_id = block_id + 1;
277
278         if (block_id >= max_registered_buffers)
279                 elog(ERROR, "too many registered buffers");
280
281         regbuf = &registered_buffers[block_id];
282
283         regbuf->rnode = *rnode;
284         regbuf->forkno = forknum;
285         regbuf->block = blknum;
286         regbuf->page = page;
287         regbuf->flags = flags;
288         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
289         regbuf->rdata_len = 0;
290
291         /*
292          * Check that this page hasn't already been registered with some other
293          * block_id.
294          */
295 #ifdef USE_ASSERT_CHECKING
296         {
297                 int                     i;
298
299                 for (i = 0; i < max_registered_block_id; i++)
300                 {
301                         registered_buffer *regbuf_old = &registered_buffers[i];
302
303                         if (i == block_id || !regbuf_old->in_use)
304                                 continue;
305
306                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
307                                    regbuf_old->forkno != regbuf->forkno ||
308                                    regbuf_old->block != regbuf->block);
309                 }
310         }
311 #endif
312
313         regbuf->in_use = true;
314 }
315
316 /*
317  * Add data to the WAL record that's being constructed.
318  *
319  * The data is appended to the "main chunk", available at replay with
320  * XLogRecGetData().
321  */
322 void
323 XLogRegisterData(char *data, int len)
324 {
325         XLogRecData *rdata;
326
327         Assert(begininsert_called);
328
329         if (num_rdatas >= max_rdatas)
330                 elog(ERROR, "too much WAL data");
331         rdata = &rdatas[num_rdatas++];
332
333         rdata->data = data;
334         rdata->len = len;
335
336         /*
337          * we use the mainrdata_last pointer to track the end of the chain, so no
338          * need to clear 'next' here.
339          */
340
341         mainrdata_last->next = rdata;
342         mainrdata_last = rdata;
343
344         mainrdata_len += len;
345 }
346
347 /*
348  * Add buffer-specific data to the WAL record that's being constructed.
349  *
350  * Block_id must reference a block previously registered with
351  * XLogRegisterBuffer(). If this is called more than once for the same
352  * block_id, the data is appended.
353  *
354  * The maximum amount of data that can be registered per block is 65535
355  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
356  * reconstruct the changes to the page, you might as well just log a full
357  * copy of it. (the "main data" that's not associated with a block is not
358  * limited)
359  */
360 void
361 XLogRegisterBufData(uint8 block_id, char *data, int len)
362 {
363         registered_buffer *regbuf;
364         XLogRecData *rdata;
365
366         Assert(begininsert_called);
367
368         /* find the registered buffer struct */
369         regbuf = &registered_buffers[block_id];
370         if (!regbuf->in_use)
371                 elog(ERROR, "no block with id %d registered with WAL insertion",
372                          block_id);
373
374         if (num_rdatas >= max_rdatas)
375                 elog(ERROR, "too much WAL data");
376         rdata = &rdatas[num_rdatas++];
377
378         rdata->data = data;
379         rdata->len = len;
380
381         regbuf->rdata_tail->next = rdata;
382         regbuf->rdata_tail = rdata;
383         regbuf->rdata_len += len;
384 }
385
386 /*
387  * Set insert status flags for the upcoming WAL record.
388  *
389  * The flags that can be used here are:
390  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
391  *   included in the record.
392  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
393  *   durability, which allows to avoid triggering WAL archiving and other
394  *   background activity.
395  */
396 void
397 XLogSetRecordFlags(uint8 flags)
398 {
399         Assert(begininsert_called);
400         curinsert_flags = flags;
401 }
402
403 /*
404  * Insert an XLOG record having the specified RMID and info bytes, with the
405  * body of the record being the data and buffer references registered earlier
406  * with XLogRegister* calls.
407  *
408  * Returns XLOG pointer to end of record (beginning of next record).
409  * This can be used as LSN for data pages affected by the logged action.
410  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
411  * before the data page can be written out.  This implements the basic
412  * WAL rule "write the log before the data".)
413  */
414 XLogRecPtr
415 XLogInsert(RmgrId rmid, uint8 info)
416 {
417         XLogRecPtr      EndPos;
418
419         /* XLogBeginInsert() must have been called. */
420         if (!begininsert_called)
421                 elog(ERROR, "XLogBeginInsert was not called");
422
423         /*
424          * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
425          * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
426          */
427         if ((info & ~(XLR_RMGR_INFO_MASK |
428                                   XLR_SPECIAL_REL_UPDATE |
429                                   XLR_CHECK_CONSISTENCY)) != 0)
430                 elog(PANIC, "invalid xlog info mask %02X", info);
431
432         TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
433
434         /*
435          * In bootstrap mode, we don't actually log anything but XLOG resources;
436          * return a phony record pointer.
437          */
438         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
439         {
440                 XLogResetInsertion();
441                 EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
442                 return EndPos;
443         }
444
445         do
446         {
447                 XLogRecPtr      RedoRecPtr;
448                 bool            doPageWrites;
449                 XLogRecPtr      fpw_lsn;
450                 XLogRecData *rdt;
451
452                 /*
453                  * Get values needed to decide whether to do full-page writes. Since
454                  * we don't yet have an insertion lock, these could change under us,
455                  * but XLogInsertRecord will recheck them once it has a lock.
456                  */
457                 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
458
459                 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
460                                                                  &fpw_lsn);
461
462                 EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
463         } while (EndPos == InvalidXLogRecPtr);
464
465         XLogResetInsertion();
466
467         return EndPos;
468 }
469
470 /*
471  * Assemble a WAL record from the registered data and buffers into an
472  * XLogRecData chain, ready for insertion with XLogInsertRecord().
473  *
474  * The record header fields are filled in, except for the xl_prev field. The
475  * calculated CRC does not include the record header yet.
476  *
477  * If there are any registered buffers, and a full-page image was not taken
478  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
479  * signals that the assembled record is only good for insertion on the
480  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
481  */
482 static XLogRecData *
483 XLogRecordAssemble(RmgrId rmid, uint8 info,
484                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
485                                    XLogRecPtr *fpw_lsn)
486 {
487         XLogRecData *rdt;
488         uint32          total_len = 0;
489         int                     block_id;
490         pg_crc32c       rdata_crc;
491         registered_buffer *prev_regbuf = NULL;
492         XLogRecData *rdt_datas_last;
493         XLogRecord *rechdr;
494         char       *scratch = hdr_scratch;
495
496         /*
497          * Note: this function can be called multiple times for the same record.
498          * All the modifications we do to the rdata chains below must handle that.
499          */
500
501         /* The record begins with the fixed-size header */
502         rechdr = (XLogRecord *) scratch;
503         scratch += SizeOfXLogRecord;
504
505         hdr_rdt.next = NULL;
506         rdt_datas_last = &hdr_rdt;
507         hdr_rdt.data = hdr_scratch;
508
509         /*
510          * Enforce consistency checks for this record if user is looking for
511          * it. Do this before at the beginning of this routine to give the
512          * possibility for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY
513          * directly for a record.
514          */
515         if (wal_consistency_checking[rmid])
516                 info |= XLR_CHECK_CONSISTENCY;
517
518         /*
519          * Make an rdata chain containing all the data portions of all block
520          * references. This includes the data for full-page images. Also append
521          * the headers for the block references in the scratch buffer.
522          */
523         *fpw_lsn = InvalidXLogRecPtr;
524         for (block_id = 0; block_id < max_registered_block_id; block_id++)
525         {
526                 registered_buffer *regbuf = &registered_buffers[block_id];
527                 bool            needs_backup;
528                 bool            needs_data;
529                 XLogRecordBlockHeader bkpb;
530                 XLogRecordBlockImageHeader bimg;
531                 XLogRecordBlockCompressHeader cbimg = {0};
532                 bool            samerel;
533                 bool            is_compressed = false;
534                 bool            include_image;
535
536                 if (!regbuf->in_use)
537                         continue;
538
539                 /* Determine if this block needs to be backed up */
540                 if (regbuf->flags & REGBUF_FORCE_IMAGE)
541                         needs_backup = true;
542                 else if (regbuf->flags & REGBUF_NO_IMAGE)
543                         needs_backup = false;
544                 else if (!doPageWrites)
545                         needs_backup = false;
546                 else
547                 {
548                         /*
549                          * We assume page LSN is first data on *every* page that can be
550                          * passed to XLogInsert, whether it has the standard page layout
551                          * or not.
552                          */
553                         XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
554
555                         needs_backup = (page_lsn <= RedoRecPtr);
556                         if (!needs_backup)
557                         {
558                                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
559                                         *fpw_lsn = page_lsn;
560                         }
561                 }
562
563                 /* Determine if the buffer data needs to included */
564                 if (regbuf->rdata_len == 0)
565                         needs_data = false;
566                 else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
567                         needs_data = true;
568                 else
569                         needs_data = !needs_backup;
570
571                 bkpb.id = block_id;
572                 bkpb.fork_flags = regbuf->forkno;
573                 bkpb.data_length = 0;
574
575                 if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
576                         bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
577
578                 /*
579                  * If needs_backup is true or WAL checking is enabled for
580                  * current resource manager, log a full-page write for the current
581                  * block.
582                  */
583                 include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
584
585                 if (include_image)
586                 {
587                         Page            page = regbuf->page;
588                         uint16          compressed_len;
589
590                         /*
591                          * The page needs to be backed up, so calculate its hole length
592                          * and offset.
593                          */
594                         if (regbuf->flags & REGBUF_STANDARD)
595                         {
596                                 /* Assume we can omit data between pd_lower and pd_upper */
597                                 uint16          lower = ((PageHeader) page)->pd_lower;
598                                 uint16          upper = ((PageHeader) page)->pd_upper;
599
600                                 if (lower >= SizeOfPageHeaderData &&
601                                         upper > lower &&
602                                         upper <= BLCKSZ)
603                                 {
604                                         bimg.hole_offset = lower;
605                                         cbimg.hole_length = upper - lower;
606                                 }
607                                 else
608                                 {
609                                         /* No "hole" to compress out */
610                                         bimg.hole_offset = 0;
611                                         cbimg.hole_length = 0;
612                                 }
613                         }
614                         else
615                         {
616                                 /* Not a standard page header, don't try to eliminate "hole" */
617                                 bimg.hole_offset = 0;
618                                 cbimg.hole_length = 0;
619                         }
620
621                         /*
622                          * Try to compress a block image if wal_compression is enabled
623                          */
624                         if (wal_compression)
625                         {
626                                 is_compressed =
627                                         XLogCompressBackupBlock(page, bimg.hole_offset,
628                                                                                         cbimg.hole_length,
629                                                                                         regbuf->compressed_page,
630                                                                                         &compressed_len);
631                         }
632
633                         /*
634                          * Fill in the remaining fields in the XLogRecordBlockHeader
635                          * struct
636                          */
637                         bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
638
639                         /*
640                          * Construct XLogRecData entries for the page content.
641                          */
642                         rdt_datas_last->next = &regbuf->bkp_rdatas[0];
643                         rdt_datas_last = rdt_datas_last->next;
644
645                         bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
646
647                         /*
648                          * If WAL consistency checking is enabled for the resource manager of
649                          * this WAL record, a full-page image is included in the record
650                          * for the block modified. During redo, the full-page is replayed
651                          * only if BKPIMAGE_APPLY is set.
652                          */
653                         if (needs_backup)
654                                 bimg.bimg_info |= BKPIMAGE_APPLY;
655
656                         if (is_compressed)
657                         {
658                                 bimg.length = compressed_len;
659                                 bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
660
661                                 rdt_datas_last->data = regbuf->compressed_page;
662                                 rdt_datas_last->len = compressed_len;
663                         }
664                         else
665                         {
666                                 bimg.length = BLCKSZ - cbimg.hole_length;
667
668                                 if (cbimg.hole_length == 0)
669                                 {
670                                         rdt_datas_last->data = page;
671                                         rdt_datas_last->len = BLCKSZ;
672                                 }
673                                 else
674                                 {
675                                         /* must skip the hole */
676                                         rdt_datas_last->data = page;
677                                         rdt_datas_last->len = bimg.hole_offset;
678
679                                         rdt_datas_last->next = &regbuf->bkp_rdatas[1];
680                                         rdt_datas_last = rdt_datas_last->next;
681
682                                         rdt_datas_last->data =
683                                                 page + (bimg.hole_offset + cbimg.hole_length);
684                                         rdt_datas_last->len =
685                                                 BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
686                                 }
687                         }
688
689                         total_len += bimg.length;
690                 }
691
692                 if (needs_data)
693                 {
694                         /*
695                          * Link the caller-supplied rdata chain for this buffer to the
696                          * overall list.
697                          */
698                         bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
699                         bkpb.data_length = regbuf->rdata_len;
700                         total_len += regbuf->rdata_len;
701
702                         rdt_datas_last->next = regbuf->rdata_head;
703                         rdt_datas_last = regbuf->rdata_tail;
704                 }
705
706                 if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
707                 {
708                         samerel = true;
709                         bkpb.fork_flags |= BKPBLOCK_SAME_REL;
710                 }
711                 else
712                         samerel = false;
713                 prev_regbuf = regbuf;
714
715                 /* Ok, copy the header to the scratch buffer */
716                 memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
717                 scratch += SizeOfXLogRecordBlockHeader;
718                 if (include_image)
719                 {
720                         memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
721                         scratch += SizeOfXLogRecordBlockImageHeader;
722                         if (cbimg.hole_length != 0 && is_compressed)
723                         {
724                                 memcpy(scratch, &cbimg,
725                                            SizeOfXLogRecordBlockCompressHeader);
726                                 scratch += SizeOfXLogRecordBlockCompressHeader;
727                         }
728                 }
729                 if (!samerel)
730                 {
731                         memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
732                         scratch += sizeof(RelFileNode);
733                 }
734                 memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
735                 scratch += sizeof(BlockNumber);
736         }
737
738         /* followed by the record's origin, if any */
739         if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
740                 replorigin_session_origin != InvalidRepOriginId)
741         {
742                 *(scratch++) = XLR_BLOCK_ID_ORIGIN;
743                 memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
744                 scratch += sizeof(replorigin_session_origin);
745         }
746
747         /* followed by main data, if any */
748         if (mainrdata_len > 0)
749         {
750                 if (mainrdata_len > 255)
751                 {
752                         *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
753                         memcpy(scratch, &mainrdata_len, sizeof(uint32));
754                         scratch += sizeof(uint32);
755                 }
756                 else
757                 {
758                         *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
759                         *(scratch++) = (uint8) mainrdata_len;
760                 }
761                 rdt_datas_last->next = mainrdata_head;
762                 rdt_datas_last = mainrdata_last;
763                 total_len += mainrdata_len;
764         }
765         rdt_datas_last->next = NULL;
766
767         hdr_rdt.len = (scratch - hdr_scratch);
768         total_len += hdr_rdt.len;
769
770         /*
771          * Calculate CRC of the data
772          *
773          * Note that the record header isn't added into the CRC initially since we
774          * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
775          * the whole record in the order: rdata, then backup blocks, then record
776          * header.
777          */
778         INIT_CRC32C(rdata_crc);
779         COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
780         for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
781                 COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
782
783         /*
784          * Fill in the fields in the record header. Prev-link is filled in later,
785          * once we know where in the WAL the record will be inserted. The CRC does
786          * not include the record header yet.
787          */
788         rechdr->xl_xid = GetCurrentTransactionIdIfAny();
789         rechdr->xl_tot_len = total_len;
790         rechdr->xl_info = info;
791         rechdr->xl_rmid = rmid;
792         rechdr->xl_prev = InvalidXLogRecPtr;
793         rechdr->xl_crc = rdata_crc;
794
795         return &hdr_rdt;
796 }
797
798 /*
799  * Create a compressed version of a backup block image.
800  *
801  * Returns FALSE if compression fails (i.e., compressed result is actually
802  * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
803  * the length of compressed block image.
804  */
805 static bool
806 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
807                                                 char *dest, uint16 *dlen)
808 {
809         int32           orig_len = BLCKSZ - hole_length;
810         int32           len;
811         int32           extra_bytes = 0;
812         char       *source;
813         char            tmp[BLCKSZ];
814
815         if (hole_length != 0)
816         {
817                 /* must skip the hole */
818                 source = tmp;
819                 memcpy(source, page, hole_offset);
820                 memcpy(source + hole_offset,
821                            page + (hole_offset + hole_length),
822                            BLCKSZ - (hole_length + hole_offset));
823
824                 /*
825                  * Extra data needs to be stored in WAL record for the compressed
826                  * version of block image if the hole exists.
827                  */
828                 extra_bytes = SizeOfXLogRecordBlockCompressHeader;
829         }
830         else
831                 source = page;
832
833         /*
834          * We recheck the actual size even if pglz_compress() reports success and
835          * see if the number of bytes saved by compression is larger than the
836          * length of extra data needed for the compressed version of block image.
837          */
838         len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
839         if (len >= 0 &&
840                 len + extra_bytes < orig_len)
841         {
842                 *dlen = (uint16) len;   /* successful compression */
843                 return true;
844         }
845         return false;
846 }
847
848 /*
849  * Determine whether the buffer referenced has to be backed up.
850  *
851  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
852  * could change later, so the result should be used for optimization purposes
853  * only.
854  */
855 bool
856 XLogCheckBufferNeedsBackup(Buffer buffer)
857 {
858         XLogRecPtr      RedoRecPtr;
859         bool            doPageWrites;
860         Page            page;
861
862         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
863
864         page = BufferGetPage(buffer);
865
866         if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
867                 return true;                    /* buffer requires backup */
868
869         return false;                           /* buffer does not need to be backed up */
870 }
871
872 /*
873  * Write a backup block if needed when we are setting a hint. Note that
874  * this may be called for a variety of page types, not just heaps.
875  *
876  * Callable while holding just share lock on the buffer content.
877  *
878  * We can't use the plain backup block mechanism since that relies on the
879  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
880  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
881  * failures. So instead we copy the page and insert the copied data as normal
882  * record data.
883  *
884  * We only need to do something if page has not yet been full page written in
885  * this checkpoint round. The LSN of the inserted wal record is returned if we
886  * had to write, InvalidXLogRecPtr otherwise.
887  *
888  * It is possible that multiple concurrent backends could attempt to write WAL
889  * records. In that case, multiple copies of the same block would be recorded
890  * in separate WAL records by different backends, though that is still OK from
891  * a correctness perspective.
892  */
893 XLogRecPtr
894 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
895 {
896         XLogRecPtr      recptr = InvalidXLogRecPtr;
897         XLogRecPtr      lsn;
898         XLogRecPtr      RedoRecPtr;
899
900         /*
901          * Ensure no checkpoint can change our view of RedoRecPtr.
902          */
903         Assert(MyPgXact->delayChkpt);
904
905         /*
906          * Update RedoRecPtr so that we can make the right decision
907          */
908         RedoRecPtr = GetRedoRecPtr();
909
910         /*
911          * We assume page LSN is first data on *every* page that can be passed to
912          * XLogInsert, whether it has the standard page layout or not. Since we're
913          * only holding a share-lock on the page, we must take the buffer header
914          * lock when we look at the LSN.
915          */
916         lsn = BufferGetLSNAtomic(buffer);
917
918         if (lsn <= RedoRecPtr)
919         {
920                 int                     flags;
921                 char            copied_buffer[BLCKSZ];
922                 char       *origdata = (char *) BufferGetBlock(buffer);
923                 RelFileNode rnode;
924                 ForkNumber      forkno;
925                 BlockNumber blkno;
926
927                 /*
928                  * Copy buffer so we don't have to worry about concurrent hint bit or
929                  * lsn updates. We assume pd_lower/upper cannot be changed without an
930                  * exclusive lock, so the contents bkp are not racy.
931                  */
932                 if (buffer_std)
933                 {
934                         /* Assume we can omit data between pd_lower and pd_upper */
935                         Page            page = BufferGetPage(buffer);
936                         uint16          lower = ((PageHeader) page)->pd_lower;
937                         uint16          upper = ((PageHeader) page)->pd_upper;
938
939                         memcpy(copied_buffer, origdata, lower);
940                         memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
941                 }
942                 else
943                         memcpy(copied_buffer, origdata, BLCKSZ);
944
945                 XLogBeginInsert();
946
947                 flags = REGBUF_FORCE_IMAGE;
948                 if (buffer_std)
949                         flags |= REGBUF_STANDARD;
950
951                 BufferGetTag(buffer, &rnode, &forkno, &blkno);
952                 XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
953
954                 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
955         }
956
957         return recptr;
958 }
959
960 /*
961  * Write a WAL record containing a full image of a page. Caller is responsible
962  * for writing the page to disk after calling this routine.
963  *
964  * Note: If you're using this function, you should be building pages in private
965  * memory and writing them directly to smgr.  If you're using buffers, call
966  * log_newpage_buffer instead.
967  *
968  * If the page follows the standard page layout, with a PageHeader and unused
969  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
970  * the unused space to be left out from the WAL record, making it smaller.
971  */
972 XLogRecPtr
973 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
974                         Page page, bool page_std)
975 {
976         int                     flags;
977         XLogRecPtr      recptr;
978
979         flags = REGBUF_FORCE_IMAGE;
980         if (page_std)
981                 flags |= REGBUF_STANDARD;
982
983         XLogBeginInsert();
984         XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
985         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
986
987         /*
988          * The page may be uninitialized. If so, we can't set the LSN because that
989          * would corrupt the page.
990          */
991         if (!PageIsNew(page))
992         {
993                 PageSetLSN(page, recptr);
994         }
995
996         return recptr;
997 }
998
999 /*
1000  * Write a WAL record containing a full image of a page.
1001  *
1002  * Caller should initialize the buffer and mark it dirty before calling this
1003  * function.  This function will set the page LSN.
1004  *
1005  * If the page follows the standard page layout, with a PageHeader and unused
1006  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
1007  * the unused space to be left out from the WAL record, making it smaller.
1008  */
1009 XLogRecPtr
1010 log_newpage_buffer(Buffer buffer, bool page_std)
1011 {
1012         Page            page = BufferGetPage(buffer);
1013         RelFileNode rnode;
1014         ForkNumber      forkNum;
1015         BlockNumber blkno;
1016
1017         /* Shared buffers should be modified in a critical section. */
1018         Assert(CritSectionCount > 0);
1019
1020         BufferGetTag(buffer, &rnode, &forkNum, &blkno);
1021
1022         return log_newpage(&rnode, forkNum, blkno, page, page_std);
1023 }
1024
1025 /*
1026  * Allocate working buffers needed for WAL record construction.
1027  */
1028 void
1029 InitXLogInsert(void)
1030 {
1031         /* Initialize the working areas */
1032         if (xloginsert_cxt == NULL)
1033         {
1034                 xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1035                                                                                            "WAL record construction",
1036                                                                                            ALLOCSET_DEFAULT_SIZES);
1037         }
1038
1039         if (registered_buffers == NULL)
1040         {
1041                 registered_buffers = (registered_buffer *)
1042                         MemoryContextAllocZero(xloginsert_cxt,
1043                                   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1044                 max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1045         }
1046         if (rdatas == NULL)
1047         {
1048                 rdatas = MemoryContextAlloc(xloginsert_cxt,
1049                                                                         sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1050                 max_rdatas = XLR_NORMAL_RDATAS;
1051         }
1052
1053         /*
1054          * Allocate a buffer to hold the header information for a WAL record.
1055          */
1056         if (hdr_scratch == NULL)
1057                 hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1058                                                                                          HEADER_SCRATCH_SIZE);
1059 }