]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xloginsert.c
pgindent run for 9.5
[postgresql] / src / backend / access / transam / xloginsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *              Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19
20 #include "postgres.h"
21
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "replication/origin.h"
30 #include "storage/bufmgr.h"
31 #include "storage/proc.h"
32 #include "utils/memutils.h"
33 #include "pg_trace.h"
34
35 /* Buffer size required to store a compressed version of backup block image */
36 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
37
38 /*
39  * For each block reference registered with XLogRegisterBuffer, we fill in
40  * a registered_buffer struct.
41  */
42 typedef struct
43 {
44         bool            in_use;                 /* is this slot in use? */
45         uint8           flags;                  /* REGBUF_* flags */
46         RelFileNode rnode;                      /* identifies the relation and block */
47         ForkNumber      forkno;
48         BlockNumber block;
49         Page            page;                   /* page content */
50         uint32          rdata_len;              /* total length of data in rdata chain */
51         XLogRecData *rdata_head;        /* head of the chain of data registered with
52                                                                  * this block */
53         XLogRecData *rdata_tail;        /* last entry in the chain, or &rdata_head if
54                                                                  * empty */
55
56         XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
57                                                                  * backup block data in XLogRecordAssemble() */
58
59         /* buffer to store a compressed version of backup block image */
60         char            compressed_page[PGLZ_MAX_BLCKSZ];
61 } registered_buffer;
62
63 static registered_buffer *registered_buffers;
64 static int      max_registered_buffers;         /* allocated size */
65 static int      max_registered_block_id = 0;            /* highest block_id + 1
66                                                                                                  * currently registered */
67
68 /*
69  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
70  * with XLogRegisterData(...).
71  */
72 static XLogRecData *mainrdata_head;
73 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
74 static uint32 mainrdata_len;    /* total # of bytes in chain */
75
76 /* Should te in-progress insertion log the origin */
77 static bool include_origin = false;
78
79 /*
80  * These are used to hold the record header while constructing a record.
81  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
82  * because we want it to be MAXALIGNed and padding bytes zeroed.
83  *
84  * For simplicity, it's allocated large enough to hold the headers for any
85  * WAL record.
86  */
87 static XLogRecData hdr_rdt;
88 static char *hdr_scratch = NULL;
89
90 #define SizeOfXlogOrigin        (sizeof(RepOriginId) + sizeof(char))
91
92 #define HEADER_SCRATCH_SIZE \
93         (SizeOfXLogRecord + \
94          MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
95          SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
96
97 /*
98  * An array of XLogRecData structs, to hold registered data.
99  */
100 static XLogRecData *rdatas;
101 static int      num_rdatas;                     /* entries currently used */
102 static int      max_rdatas;                     /* allocated size */
103
104 static bool begininsert_called = false;
105
106 /* Memory context to hold the registered buffer and data references. */
107 static MemoryContext xloginsert_cxt;
108
109 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
110                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
111                                    XLogRecPtr *fpw_lsn);
112 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
113                                                 uint16 hole_length, char *dest, uint16 *dlen);
114
115 /*
116  * Begin constructing a WAL record. This must be called before the
117  * XLogRegister* functions and XLogInsert().
118  */
119 void
120 XLogBeginInsert(void)
121 {
122         Assert(max_registered_block_id == 0);
123         Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
124         Assert(mainrdata_len == 0);
125         Assert(!begininsert_called);
126
127         /* cross-check on whether we should be here or not */
128         if (!XLogInsertAllowed())
129                 elog(ERROR, "cannot make new WAL entries during recovery");
130
131         begininsert_called = true;
132 }
133
134 /*
135  * Ensure that there are enough buffer and data slots in the working area,
136  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
137  * calls.
138  *
139  * There is always space for a small number of buffers and data chunks, enough
140  * for most record types. This function is for the exceptional cases that need
141  * more.
142  */
143 void
144 XLogEnsureRecordSpace(int max_block_id, int ndatas)
145 {
146         int                     nbuffers;
147
148         /*
149          * This must be called before entering a critical section, because
150          * allocating memory inside a critical section can fail. repalloc() will
151          * check the same, but better to check it here too so that we fail
152          * consistently even if the arrays happen to be large enough already.
153          */
154         Assert(CritSectionCount == 0);
155
156         /* the minimum values can't be decreased */
157         if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
158                 max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
159         if (ndatas < XLR_NORMAL_RDATAS)
160                 ndatas = XLR_NORMAL_RDATAS;
161
162         if (max_block_id > XLR_MAX_BLOCK_ID)
163                 elog(ERROR, "maximum number of WAL record block references exceeded");
164         nbuffers = max_block_id + 1;
165
166         if (nbuffers > max_registered_buffers)
167         {
168                 registered_buffers = (registered_buffer *)
169                         repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
170
171                 /*
172                  * At least the padding bytes in the structs must be zeroed, because
173                  * they are included in WAL data, but initialize it all for tidiness.
174                  */
175                 MemSet(&registered_buffers[max_registered_buffers], 0,
176                         (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
177                 max_registered_buffers = nbuffers;
178         }
179
180         if (ndatas > max_rdatas)
181         {
182                 rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
183                 max_rdatas = ndatas;
184         }
185 }
186
187 /*
188  * Reset WAL record construction buffers.
189  */
190 void
191 XLogResetInsertion(void)
192 {
193         int                     i;
194
195         for (i = 0; i < max_registered_block_id; i++)
196                 registered_buffers[i].in_use = false;
197
198         num_rdatas = 0;
199         max_registered_block_id = 0;
200         mainrdata_len = 0;
201         mainrdata_last = (XLogRecData *) &mainrdata_head;
202         include_origin = false;
203         begininsert_called = false;
204 }
205
206 /*
207  * Register a reference to a buffer with the WAL record being constructed.
208  * This must be called for every page that the WAL-logged operation modifies.
209  */
210 void
211 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
212 {
213         registered_buffer *regbuf;
214
215         /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
216         Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
217         Assert(begininsert_called);
218
219         if (block_id >= max_registered_block_id)
220         {
221                 if (block_id >= max_registered_buffers)
222                         elog(ERROR, "too many registered buffers");
223                 max_registered_block_id = block_id + 1;
224         }
225
226         regbuf = &registered_buffers[block_id];
227
228         BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
229         regbuf->page = BufferGetPage(buffer);
230         regbuf->flags = flags;
231         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
232         regbuf->rdata_len = 0;
233
234         /*
235          * Check that this page hasn't already been registered with some other
236          * block_id.
237          */
238 #ifdef USE_ASSERT_CHECKING
239         {
240                 int                     i;
241
242                 for (i = 0; i < max_registered_block_id; i++)
243                 {
244                         registered_buffer *regbuf_old = &registered_buffers[i];
245
246                         if (i == block_id || !regbuf_old->in_use)
247                                 continue;
248
249                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
250                                    regbuf_old->forkno != regbuf->forkno ||
251                                    regbuf_old->block != regbuf->block);
252                 }
253         }
254 #endif
255
256         regbuf->in_use = true;
257 }
258
259 /*
260  * Like XLogRegisterBuffer, but for registering a block that's not in the
261  * shared buffer pool (i.e. when you don't have a Buffer for it).
262  */
263 void
264 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
265                                   BlockNumber blknum, Page page, uint8 flags)
266 {
267         registered_buffer *regbuf;
268
269         /* This is currently only used to WAL-log a full-page image of a page */
270         Assert(flags & REGBUF_FORCE_IMAGE);
271         Assert(begininsert_called);
272
273         if (block_id >= max_registered_block_id)
274                 max_registered_block_id = block_id + 1;
275
276         if (block_id >= max_registered_buffers)
277                 elog(ERROR, "too many registered buffers");
278
279         regbuf = &registered_buffers[block_id];
280
281         regbuf->rnode = *rnode;
282         regbuf->forkno = forknum;
283         regbuf->block = blknum;
284         regbuf->page = page;
285         regbuf->flags = flags;
286         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
287         regbuf->rdata_len = 0;
288
289         /*
290          * Check that this page hasn't already been registered with some other
291          * block_id.
292          */
293 #ifdef USE_ASSERT_CHECKING
294         {
295                 int                     i;
296
297                 for (i = 0; i < max_registered_block_id; i++)
298                 {
299                         registered_buffer *regbuf_old = &registered_buffers[i];
300
301                         if (i == block_id || !regbuf_old->in_use)
302                                 continue;
303
304                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
305                                    regbuf_old->forkno != regbuf->forkno ||
306                                    regbuf_old->block != regbuf->block);
307                 }
308         }
309 #endif
310
311         regbuf->in_use = true;
312 }
313
314 /*
315  * Add data to the WAL record that's being constructed.
316  *
317  * The data is appended to the "main chunk", available at replay with
318  * XLogRecGetData().
319  */
320 void
321 XLogRegisterData(char *data, int len)
322 {
323         XLogRecData *rdata;
324
325         Assert(begininsert_called);
326
327         if (num_rdatas >= max_rdatas)
328                 elog(ERROR, "too much WAL data");
329         rdata = &rdatas[num_rdatas++];
330
331         rdata->data = data;
332         rdata->len = len;
333
334         /*
335          * we use the mainrdata_last pointer to track the end of the chain, so no
336          * need to clear 'next' here.
337          */
338
339         mainrdata_last->next = rdata;
340         mainrdata_last = rdata;
341
342         mainrdata_len += len;
343 }
344
345 /*
346  * Add buffer-specific data to the WAL record that's being constructed.
347  *
348  * Block_id must reference a block previously registered with
349  * XLogRegisterBuffer(). If this is called more than once for the same
350  * block_id, the data is appended.
351  *
352  * The maximum amount of data that can be registered per block is 65535
353  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
354  * reconstruct the changes to the page, you might as well just log a full
355  * copy of it. (the "main data" that's not associated with a block is not
356  * limited)
357  */
358 void
359 XLogRegisterBufData(uint8 block_id, char *data, int len)
360 {
361         registered_buffer *regbuf;
362         XLogRecData *rdata;
363
364         Assert(begininsert_called);
365
366         /* find the registered buffer struct */
367         regbuf = &registered_buffers[block_id];
368         if (!regbuf->in_use)
369                 elog(ERROR, "no block with id %d registered with WAL insertion",
370                          block_id);
371
372         if (num_rdatas >= max_rdatas)
373                 elog(ERROR, "too much WAL data");
374         rdata = &rdatas[num_rdatas++];
375
376         rdata->data = data;
377         rdata->len = len;
378
379         regbuf->rdata_tail->next = rdata;
380         regbuf->rdata_tail = rdata;
381         regbuf->rdata_len += len;
382 }
383
384 /*
385  * Should this record include the replication origin if one is set up?
386  */
387 void
388 XLogIncludeOrigin(void)
389 {
390         Assert(begininsert_called);
391         include_origin = true;
392 }
393
394 /*
395  * Insert an XLOG record having the specified RMID and info bytes, with the
396  * body of the record being the data and buffer references registered earlier
397  * with XLogRegister* calls.
398  *
399  * Returns XLOG pointer to end of record (beginning of next record).
400  * This can be used as LSN for data pages affected by the logged action.
401  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
402  * before the data page can be written out.  This implements the basic
403  * WAL rule "write the log before the data".)
404  */
405 XLogRecPtr
406 XLogInsert(RmgrId rmid, uint8 info)
407 {
408         XLogRecPtr      EndPos;
409
410         /* XLogBeginInsert() must have been called. */
411         if (!begininsert_called)
412                 elog(ERROR, "XLogBeginInsert was not called");
413
414         /*
415          * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
416          * reserved for use by me.
417          */
418         if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
419                 elog(PANIC, "invalid xlog info mask %02X", info);
420
421         TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
422
423         /*
424          * In bootstrap mode, we don't actually log anything but XLOG resources;
425          * return a phony record pointer.
426          */
427         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
428         {
429                 XLogResetInsertion();
430                 EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
431                 return EndPos;
432         }
433
434         do
435         {
436                 XLogRecPtr      RedoRecPtr;
437                 bool            doPageWrites;
438                 XLogRecPtr      fpw_lsn;
439                 XLogRecData *rdt;
440
441                 /*
442                  * Get values needed to decide whether to do full-page writes. Since
443                  * we don't yet have an insertion lock, these could change under us,
444                  * but XLogInsertRecData will recheck them once it has a lock.
445                  */
446                 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
447
448                 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
449                                                                  &fpw_lsn);
450
451                 EndPos = XLogInsertRecord(rdt, fpw_lsn);
452         } while (EndPos == InvalidXLogRecPtr);
453
454         XLogResetInsertion();
455
456         return EndPos;
457 }
458
459 /*
460  * Assemble a WAL record from the registered data and buffers into an
461  * XLogRecData chain, ready for insertion with XLogInsertRecord().
462  *
463  * The record header fields are filled in, except for the xl_prev field. The
464  * calculated CRC does not include the record header yet.
465  *
466  * If there are any registered buffers, and a full-page image was not taken
467  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
468  * signals that the assembled record is only good for insertion on the
469  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
470  */
471 static XLogRecData *
472 XLogRecordAssemble(RmgrId rmid, uint8 info,
473                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
474                                    XLogRecPtr *fpw_lsn)
475 {
476         XLogRecData *rdt;
477         uint32          total_len = 0;
478         int                     block_id;
479         pg_crc32c       rdata_crc;
480         registered_buffer *prev_regbuf = NULL;
481         XLogRecData *rdt_datas_last;
482         XLogRecord *rechdr;
483         char       *scratch = hdr_scratch;
484
485         /*
486          * Note: this function can be called multiple times for the same record.
487          * All the modifications we do to the rdata chains below must handle that.
488          */
489
490         /* The record begins with the fixed-size header */
491         rechdr = (XLogRecord *) scratch;
492         scratch += SizeOfXLogRecord;
493
494         hdr_rdt.next = NULL;
495         rdt_datas_last = &hdr_rdt;
496         hdr_rdt.data = hdr_scratch;
497
498         /*
499          * Make an rdata chain containing all the data portions of all block
500          * references. This includes the data for full-page images. Also append
501          * the headers for the block references in the scratch buffer.
502          */
503         *fpw_lsn = InvalidXLogRecPtr;
504         for (block_id = 0; block_id < max_registered_block_id; block_id++)
505         {
506                 registered_buffer *regbuf = &registered_buffers[block_id];
507                 bool            needs_backup;
508                 bool            needs_data;
509                 XLogRecordBlockHeader bkpb;
510                 XLogRecordBlockImageHeader bimg;
511                 XLogRecordBlockCompressHeader cbimg = {0};
512                 bool            samerel;
513                 bool            is_compressed = false;
514
515                 if (!regbuf->in_use)
516                         continue;
517
518                 /* Determine if this block needs to be backed up */
519                 if (regbuf->flags & REGBUF_FORCE_IMAGE)
520                         needs_backup = true;
521                 else if (regbuf->flags & REGBUF_NO_IMAGE)
522                         needs_backup = false;
523                 else if (!doPageWrites)
524                         needs_backup = false;
525                 else
526                 {
527                         /*
528                          * We assume page LSN is first data on *every* page that can be
529                          * passed to XLogInsert, whether it has the standard page layout
530                          * or not.
531                          */
532                         XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
533
534                         needs_backup = (page_lsn <= RedoRecPtr);
535                         if (!needs_backup)
536                         {
537                                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
538                                         *fpw_lsn = page_lsn;
539                         }
540                 }
541
542                 /* Determine if the buffer data needs to included */
543                 if (regbuf->rdata_len == 0)
544                         needs_data = false;
545                 else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
546                         needs_data = true;
547                 else
548                         needs_data = !needs_backup;
549
550                 bkpb.id = block_id;
551                 bkpb.fork_flags = regbuf->forkno;
552                 bkpb.data_length = 0;
553
554                 if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
555                         bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
556
557                 if (needs_backup)
558                 {
559                         Page            page = regbuf->page;
560                         uint16          compressed_len;
561
562                         /*
563                          * The page needs to be backed up, so calculate its hole length
564                          * and offset.
565                          */
566                         if (regbuf->flags & REGBUF_STANDARD)
567                         {
568                                 /* Assume we can omit data between pd_lower and pd_upper */
569                                 uint16          lower = ((PageHeader) page)->pd_lower;
570                                 uint16          upper = ((PageHeader) page)->pd_upper;
571
572                                 if (lower >= SizeOfPageHeaderData &&
573                                         upper > lower &&
574                                         upper <= BLCKSZ)
575                                 {
576                                         bimg.hole_offset = lower;
577                                         cbimg.hole_length = upper - lower;
578                                 }
579                                 else
580                                 {
581                                         /* No "hole" to compress out */
582                                         bimg.hole_offset = 0;
583                                         cbimg.hole_length = 0;
584                                 }
585                         }
586                         else
587                         {
588                                 /* Not a standard page header, don't try to eliminate "hole" */
589                                 bimg.hole_offset = 0;
590                                 cbimg.hole_length = 0;
591                         }
592
593                         /*
594                          * Try to compress a block image if wal_compression is enabled
595                          */
596                         if (wal_compression)
597                         {
598                                 is_compressed =
599                                         XLogCompressBackupBlock(page, bimg.hole_offset,
600                                                                                         cbimg.hole_length,
601                                                                                         regbuf->compressed_page,
602                                                                                         &compressed_len);
603                         }
604
605                         /*
606                          * Fill in the remaining fields in the XLogRecordBlockHeader
607                          * struct
608                          */
609                         bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
610
611                         /*
612                          * Construct XLogRecData entries for the page content.
613                          */
614                         rdt_datas_last->next = &regbuf->bkp_rdatas[0];
615                         rdt_datas_last = rdt_datas_last->next;
616
617                         bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
618
619                         if (is_compressed)
620                         {
621                                 bimg.length = compressed_len;
622                                 bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
623
624                                 rdt_datas_last->data = regbuf->compressed_page;
625                                 rdt_datas_last->len = compressed_len;
626                         }
627                         else
628                         {
629                                 bimg.length = BLCKSZ - cbimg.hole_length;
630
631                                 if (cbimg.hole_length == 0)
632                                 {
633                                         rdt_datas_last->data = page;
634                                         rdt_datas_last->len = BLCKSZ;
635                                 }
636                                 else
637                                 {
638                                         /* must skip the hole */
639                                         rdt_datas_last->data = page;
640                                         rdt_datas_last->len = bimg.hole_offset;
641
642                                         rdt_datas_last->next = &regbuf->bkp_rdatas[1];
643                                         rdt_datas_last = rdt_datas_last->next;
644
645                                         rdt_datas_last->data =
646                                                 page + (bimg.hole_offset + cbimg.hole_length);
647                                         rdt_datas_last->len =
648                                                 BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
649                                 }
650                         }
651
652                         total_len += bimg.length;
653                 }
654
655                 if (needs_data)
656                 {
657                         /*
658                          * Link the caller-supplied rdata chain for this buffer to the
659                          * overall list.
660                          */
661                         bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
662                         bkpb.data_length = regbuf->rdata_len;
663                         total_len += regbuf->rdata_len;
664
665                         rdt_datas_last->next = regbuf->rdata_head;
666                         rdt_datas_last = regbuf->rdata_tail;
667                 }
668
669                 if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
670                 {
671                         samerel = true;
672                         bkpb.fork_flags |= BKPBLOCK_SAME_REL;
673                 }
674                 else
675                         samerel = false;
676                 prev_regbuf = regbuf;
677
678                 /* Ok, copy the header to the scratch buffer */
679                 memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
680                 scratch += SizeOfXLogRecordBlockHeader;
681                 if (needs_backup)
682                 {
683                         memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
684                         scratch += SizeOfXLogRecordBlockImageHeader;
685                         if (cbimg.hole_length != 0 && is_compressed)
686                         {
687                                 memcpy(scratch, &cbimg,
688                                            SizeOfXLogRecordBlockCompressHeader);
689                                 scratch += SizeOfXLogRecordBlockCompressHeader;
690                         }
691                 }
692                 if (!samerel)
693                 {
694                         memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
695                         scratch += sizeof(RelFileNode);
696                 }
697                 memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
698                 scratch += sizeof(BlockNumber);
699         }
700
701         /* followed by the record's origin, if any */
702         if (include_origin && replorigin_sesssion_origin != InvalidRepOriginId)
703         {
704                 *(scratch++) = XLR_BLOCK_ID_ORIGIN;
705                 memcpy(scratch, &replorigin_sesssion_origin, sizeof(replorigin_sesssion_origin));
706                 scratch += sizeof(replorigin_sesssion_origin);
707         }
708
709         /* followed by main data, if any */
710         if (mainrdata_len > 0)
711         {
712                 if (mainrdata_len > 255)
713                 {
714                         *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
715                         memcpy(scratch, &mainrdata_len, sizeof(uint32));
716                         scratch += sizeof(uint32);
717                 }
718                 else
719                 {
720                         *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
721                         *(scratch++) = (uint8) mainrdata_len;
722                 }
723                 rdt_datas_last->next = mainrdata_head;
724                 rdt_datas_last = mainrdata_last;
725                 total_len += mainrdata_len;
726         }
727         rdt_datas_last->next = NULL;
728
729         hdr_rdt.len = (scratch - hdr_scratch);
730         total_len += hdr_rdt.len;
731
732         /*
733          * Calculate CRC of the data
734          *
735          * Note that the record header isn't added into the CRC initially since we
736          * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
737          * the whole record in the order: rdata, then backup blocks, then record
738          * header.
739          */
740         INIT_CRC32C(rdata_crc);
741         COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
742         for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
743                 COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
744
745         /*
746          * Fill in the fields in the record header. Prev-link is filled in later,
747          * once we know where in the WAL the record will be inserted. The CRC does
748          * not include the record header yet.
749          */
750         rechdr->xl_xid = GetCurrentTransactionIdIfAny();
751         rechdr->xl_tot_len = total_len;
752         rechdr->xl_info = info;
753         rechdr->xl_rmid = rmid;
754         rechdr->xl_prev = InvalidXLogRecPtr;
755         rechdr->xl_crc = rdata_crc;
756
757         return &hdr_rdt;
758 }
759
760 /*
761  * Create a compressed version of a backup block image.
762  *
763  * Returns FALSE if compression fails (i.e., compressed result is actually
764  * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
765  * the length of compressed block image.
766  */
767 static bool
768 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
769                                                 char *dest, uint16 *dlen)
770 {
771         int32           orig_len = BLCKSZ - hole_length;
772         int32           len;
773         int32           extra_bytes = 0;
774         char       *source;
775         char            tmp[BLCKSZ];
776
777         if (hole_length != 0)
778         {
779                 /* must skip the hole */
780                 source = tmp;
781                 memcpy(source, page, hole_offset);
782                 memcpy(source + hole_offset,
783                            page + (hole_offset + hole_length),
784                            BLCKSZ - (hole_length + hole_offset));
785
786                 /*
787                  * Extra data needs to be stored in WAL record for the compressed
788                  * version of block image if the hole exists.
789                  */
790                 extra_bytes = SizeOfXLogRecordBlockCompressHeader;
791         }
792         else
793                 source = page;
794
795         /*
796          * We recheck the actual size even if pglz_compress() reports success and
797          * see if the number of bytes saved by compression is larger than the
798          * length of extra data needed for the compressed version of block image.
799          */
800         len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
801         if (len >= 0 &&
802                 len + extra_bytes < orig_len)
803         {
804                 *dlen = (uint16) len;   /* successful compression */
805                 return true;
806         }
807         return false;
808 }
809
810 /*
811  * Determine whether the buffer referenced has to be backed up.
812  *
813  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
814  * could change later, so the result should be used for optimization purposes
815  * only.
816  */
817 bool
818 XLogCheckBufferNeedsBackup(Buffer buffer)
819 {
820         XLogRecPtr      RedoRecPtr;
821         bool            doPageWrites;
822         Page            page;
823
824         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
825
826         page = BufferGetPage(buffer);
827
828         if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
829                 return true;                    /* buffer requires backup */
830
831         return false;                           /* buffer does not need to be backed up */
832 }
833
834 /*
835  * Write a backup block if needed when we are setting a hint. Note that
836  * this may be called for a variety of page types, not just heaps.
837  *
838  * Callable while holding just share lock on the buffer content.
839  *
840  * We can't use the plain backup block mechanism since that relies on the
841  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
842  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
843  * failures. So instead we copy the page and insert the copied data as normal
844  * record data.
845  *
846  * We only need to do something if page has not yet been full page written in
847  * this checkpoint round. The LSN of the inserted wal record is returned if we
848  * had to write, InvalidXLogRecPtr otherwise.
849  *
850  * It is possible that multiple concurrent backends could attempt to write WAL
851  * records. In that case, multiple copies of the same block would be recorded
852  * in separate WAL records by different backends, though that is still OK from
853  * a correctness perspective.
854  */
855 XLogRecPtr
856 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
857 {
858         XLogRecPtr      recptr = InvalidXLogRecPtr;
859         XLogRecPtr      lsn;
860         XLogRecPtr      RedoRecPtr;
861
862         /*
863          * Ensure no checkpoint can change our view of RedoRecPtr.
864          */
865         Assert(MyPgXact->delayChkpt);
866
867         /*
868          * Update RedoRecPtr so that we can make the right decision
869          */
870         RedoRecPtr = GetRedoRecPtr();
871
872         /*
873          * We assume page LSN is first data on *every* page that can be passed to
874          * XLogInsert, whether it has the standard page layout or not. Since we're
875          * only holding a share-lock on the page, we must take the buffer header
876          * lock when we look at the LSN.
877          */
878         lsn = BufferGetLSNAtomic(buffer);
879
880         if (lsn <= RedoRecPtr)
881         {
882                 int                     flags;
883                 char            copied_buffer[BLCKSZ];
884                 char       *origdata = (char *) BufferGetBlock(buffer);
885                 RelFileNode rnode;
886                 ForkNumber      forkno;
887                 BlockNumber blkno;
888
889                 /*
890                  * Copy buffer so we don't have to worry about concurrent hint bit or
891                  * lsn updates. We assume pd_lower/upper cannot be changed without an
892                  * exclusive lock, so the contents bkp are not racy.
893                  */
894                 if (buffer_std)
895                 {
896                         /* Assume we can omit data between pd_lower and pd_upper */
897                         Page            page = BufferGetPage(buffer);
898                         uint16          lower = ((PageHeader) page)->pd_lower;
899                         uint16          upper = ((PageHeader) page)->pd_upper;
900
901                         memcpy(copied_buffer, origdata, lower);
902                         memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
903                 }
904                 else
905                         memcpy(copied_buffer, origdata, BLCKSZ);
906
907                 XLogBeginInsert();
908
909                 flags = REGBUF_FORCE_IMAGE;
910                 if (buffer_std)
911                         flags |= REGBUF_STANDARD;
912
913                 BufferGetTag(buffer, &rnode, &forkno, &blkno);
914                 XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
915
916                 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
917         }
918
919         return recptr;
920 }
921
922 /*
923  * Write a WAL record containing a full image of a page. Caller is responsible
924  * for writing the page to disk after calling this routine.
925  *
926  * Note: If you're using this function, you should be building pages in private
927  * memory and writing them directly to smgr.  If you're using buffers, call
928  * log_newpage_buffer instead.
929  *
930  * If the page follows the standard page layout, with a PageHeader and unused
931  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
932  * the unused space to be left out from the WAL record, making it smaller.
933  */
934 XLogRecPtr
935 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
936                         Page page, bool page_std)
937 {
938         int                     flags;
939         XLogRecPtr      recptr;
940
941         flags = REGBUF_FORCE_IMAGE;
942         if (page_std)
943                 flags |= REGBUF_STANDARD;
944
945         XLogBeginInsert();
946         XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
947         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
948
949         /*
950          * The page may be uninitialized. If so, we can't set the LSN because that
951          * would corrupt the page.
952          */
953         if (!PageIsNew(page))
954         {
955                 PageSetLSN(page, recptr);
956         }
957
958         return recptr;
959 }
960
961 /*
962  * Write a WAL record containing a full image of a page.
963  *
964  * Caller should initialize the buffer and mark it dirty before calling this
965  * function.  This function will set the page LSN.
966  *
967  * If the page follows the standard page layout, with a PageHeader and unused
968  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
969  * the unused space to be left out from the WAL record, making it smaller.
970  */
971 XLogRecPtr
972 log_newpage_buffer(Buffer buffer, bool page_std)
973 {
974         Page            page = BufferGetPage(buffer);
975         RelFileNode rnode;
976         ForkNumber      forkNum;
977         BlockNumber blkno;
978
979         /* Shared buffers should be modified in a critical section. */
980         Assert(CritSectionCount > 0);
981
982         BufferGetTag(buffer, &rnode, &forkNum, &blkno);
983
984         return log_newpage(&rnode, forkNum, blkno, page, page_std);
985 }
986
987 /*
988  * Allocate working buffers needed for WAL record construction.
989  */
990 void
991 InitXLogInsert(void)
992 {
993         /* Initialize the working areas */
994         if (xloginsert_cxt == NULL)
995         {
996                 xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
997                                                                                            "WAL record construction",
998                                                                                            ALLOCSET_DEFAULT_MINSIZE,
999                                                                                            ALLOCSET_DEFAULT_INITSIZE,
1000                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
1001         }
1002
1003         if (registered_buffers == NULL)
1004         {
1005                 registered_buffers = (registered_buffer *)
1006                         MemoryContextAllocZero(xloginsert_cxt,
1007                                   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1008                 max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1009         }
1010         if (rdatas == NULL)
1011         {
1012                 rdatas = MemoryContextAlloc(xloginsert_cxt,
1013                                                                         sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1014                 max_rdatas = XLR_NORMAL_RDATAS;
1015         }
1016
1017         /*
1018          * Allocate a buffer to hold the header information for a WAL record.
1019          */
1020         if (hdr_scratch == NULL)
1021                 hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1022                                                                                          HEADER_SCRATCH_SIZE);
1023 }