]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xloginsert.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / transam / xloginsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *              Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19
20 #include "postgres.h"
21
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "storage/bufmgr.h"
30 #include "storage/proc.h"
31 #include "utils/memutils.h"
32 #include "pg_trace.h"
33
34 /* Buffer size required to store a compressed version of backup block image */
35 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
36
37 /*
38  * For each block reference registered with XLogRegisterBuffer, we fill in
39  * a registered_buffer struct.
40  */
41 typedef struct
42 {
43         bool            in_use;                 /* is this slot in use? */
44         uint8           flags;                  /* REGBUF_* flags */
45         RelFileNode rnode;                      /* identifies the relation and block */
46         ForkNumber      forkno;
47         BlockNumber block;
48         Page            page;                   /* page content */
49         uint32          rdata_len;              /* total length of data in rdata chain */
50         XLogRecData *rdata_head;        /* head of the chain of data registered with
51                                                                  * this block */
52         XLogRecData *rdata_tail;        /* last entry in the chain, or &rdata_head if
53                                                                  * empty */
54
55         XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
56                                                                  * backup block data in XLogRecordAssemble() */
57
58         /* buffer to store a compressed version of backup block image */
59         char            compressed_page[PGLZ_MAX_BLCKSZ];
60 }       registered_buffer;
61
62 static registered_buffer *registered_buffers;
63 static int      max_registered_buffers;         /* allocated size */
64 static int      max_registered_block_id = 0;            /* highest block_id + 1
65                                                                                                  * currently registered */
66
67 /*
68  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
69  * with XLogRegisterData(...).
70  */
71 static XLogRecData *mainrdata_head;
72 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
73 static uint32 mainrdata_len;    /* total # of bytes in chain */
74
75 /*
76  * These are used to hold the record header while constructing a record.
77  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
78  * because we want it to be MAXALIGNed and padding bytes zeroed.
79  *
80  * For simplicity, it's allocated large enough to hold the headers for any
81  * WAL record.
82  */
83 static XLogRecData hdr_rdt;
84 static char *hdr_scratch = NULL;
85
86 #define HEADER_SCRATCH_SIZE \
87         (SizeOfXLogRecord + \
88          MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
89          SizeOfXLogRecordDataHeaderLong)
90
91 /*
92  * An array of XLogRecData structs, to hold registered data.
93  */
94 static XLogRecData *rdatas;
95 static int      num_rdatas;                     /* entries currently used */
96 static int      max_rdatas;                     /* allocated size */
97
98 static bool begininsert_called = false;
99
100 /* Memory context to hold the registered buffer and data references. */
101 static MemoryContext xloginsert_cxt;
102
103 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
104                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
105                                    XLogRecPtr *fpw_lsn);
106 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
107                                                                         uint16 hole_length, char *dest, uint16 *dlen);
108
109 /*
110  * Begin constructing a WAL record. This must be called before the
111  * XLogRegister* functions and XLogInsert().
112  */
113 void
114 XLogBeginInsert(void)
115 {
116         Assert(max_registered_block_id == 0);
117         Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
118         Assert(mainrdata_len == 0);
119         Assert(!begininsert_called);
120
121         /* cross-check on whether we should be here or not */
122         if (!XLogInsertAllowed())
123                 elog(ERROR, "cannot make new WAL entries during recovery");
124
125         begininsert_called = true;
126 }
127
128 /*
129  * Ensure that there are enough buffer and data slots in the working area,
130  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
131  * calls.
132  *
133  * There is always space for a small number of buffers and data chunks, enough
134  * for most record types. This function is for the exceptional cases that need
135  * more.
136  */
137 void
138 XLogEnsureRecordSpace(int max_block_id, int ndatas)
139 {
140         int                     nbuffers;
141
142         /*
143          * This must be called before entering a critical section, because
144          * allocating memory inside a critical section can fail. repalloc() will
145          * check the same, but better to check it here too so that we fail
146          * consistently even if the arrays happen to be large enough already.
147          */
148         Assert(CritSectionCount == 0);
149
150         /* the minimum values can't be decreased */
151         if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
152                 max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
153         if (ndatas < XLR_NORMAL_RDATAS)
154                 ndatas = XLR_NORMAL_RDATAS;
155
156         if (max_block_id > XLR_MAX_BLOCK_ID)
157                 elog(ERROR, "maximum number of WAL record block references exceeded");
158         nbuffers = max_block_id + 1;
159
160         if (nbuffers > max_registered_buffers)
161         {
162                 registered_buffers = (registered_buffer *)
163                         repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
164
165                 /*
166                  * At least the padding bytes in the structs must be zeroed, because
167                  * they are included in WAL data, but initialize it all for tidiness.
168                  */
169                 MemSet(&registered_buffers[max_registered_buffers], 0,
170                         (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
171                 max_registered_buffers = nbuffers;
172         }
173
174         if (ndatas > max_rdatas)
175         {
176                 rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
177                 max_rdatas = ndatas;
178         }
179 }
180
181 /*
182  * Reset WAL record construction buffers.
183  */
184 void
185 XLogResetInsertion(void)
186 {
187         int                     i;
188
189         for (i = 0; i < max_registered_block_id; i++)
190                 registered_buffers[i].in_use = false;
191
192         num_rdatas = 0;
193         max_registered_block_id = 0;
194         mainrdata_len = 0;
195         mainrdata_last = (XLogRecData *) &mainrdata_head;
196         begininsert_called = false;
197 }
198
199 /*
200  * Register a reference to a buffer with the WAL record being constructed.
201  * This must be called for every page that the WAL-logged operation modifies.
202  */
203 void
204 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
205 {
206         registered_buffer *regbuf;
207
208         /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
209         Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
210         Assert(begininsert_called);
211
212         if (block_id >= max_registered_block_id)
213         {
214                 if (block_id >= max_registered_buffers)
215                         elog(ERROR, "too many registered buffers");
216                 max_registered_block_id = block_id + 1;
217         }
218
219         regbuf = &registered_buffers[block_id];
220
221         BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
222         regbuf->page = BufferGetPage(buffer);
223         regbuf->flags = flags;
224         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
225         regbuf->rdata_len = 0;
226
227         /*
228          * Check that this page hasn't already been registered with some other
229          * block_id.
230          */
231 #ifdef USE_ASSERT_CHECKING
232         {
233                 int                     i;
234
235                 for (i = 0; i < max_registered_block_id; i++)
236                 {
237                         registered_buffer *regbuf_old = &registered_buffers[i];
238
239                         if (i == block_id || !regbuf_old->in_use)
240                                 continue;
241
242                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
243                                    regbuf_old->forkno != regbuf->forkno ||
244                                    regbuf_old->block != regbuf->block);
245                 }
246         }
247 #endif
248
249         regbuf->in_use = true;
250 }
251
252 /*
253  * Like XLogRegisterBuffer, but for registering a block that's not in the
254  * shared buffer pool (i.e. when you don't have a Buffer for it).
255  */
256 void
257 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
258                                   BlockNumber blknum, Page page, uint8 flags)
259 {
260         registered_buffer *regbuf;
261
262         /* This is currently only used to WAL-log a full-page image of a page */
263         Assert(flags & REGBUF_FORCE_IMAGE);
264         Assert(begininsert_called);
265
266         if (block_id >= max_registered_block_id)
267                 max_registered_block_id = block_id + 1;
268
269         if (block_id >= max_registered_buffers)
270                 elog(ERROR, "too many registered buffers");
271
272         regbuf = &registered_buffers[block_id];
273
274         regbuf->rnode = *rnode;
275         regbuf->forkno = forknum;
276         regbuf->block = blknum;
277         regbuf->page = page;
278         regbuf->flags = flags;
279         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
280         regbuf->rdata_len = 0;
281
282         /*
283          * Check that this page hasn't already been registered with some other
284          * block_id.
285          */
286 #ifdef USE_ASSERT_CHECKING
287         {
288                 int                     i;
289
290                 for (i = 0; i < max_registered_block_id; i++)
291                 {
292                         registered_buffer *regbuf_old = &registered_buffers[i];
293
294                         if (i == block_id || !regbuf_old->in_use)
295                                 continue;
296
297                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
298                                    regbuf_old->forkno != regbuf->forkno ||
299                                    regbuf_old->block != regbuf->block);
300                 }
301         }
302 #endif
303
304         regbuf->in_use = true;
305 }
306
307 /*
308  * Add data to the WAL record that's being constructed.
309  *
310  * The data is appended to the "main chunk", available at replay with
311  * XLogRecGetData().
312  */
313 void
314 XLogRegisterData(char *data, int len)
315 {
316         XLogRecData *rdata;
317
318         Assert(begininsert_called);
319
320         if (num_rdatas >= max_rdatas)
321                 elog(ERROR, "too much WAL data");
322         rdata = &rdatas[num_rdatas++];
323
324         rdata->data = data;
325         rdata->len = len;
326
327         /*
328          * we use the mainrdata_last pointer to track the end of the chain, so no
329          * need to clear 'next' here.
330          */
331
332         mainrdata_last->next = rdata;
333         mainrdata_last = rdata;
334
335         mainrdata_len += len;
336 }
337
338 /*
339  * Add buffer-specific data to the WAL record that's being constructed.
340  *
341  * Block_id must reference a block previously registered with
342  * XLogRegisterBuffer(). If this is called more than once for the same
343  * block_id, the data is appended.
344  *
345  * The maximum amount of data that can be registered per block is 65535
346  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
347  * reconstruct the changes to the page, you might as well just log a full
348  * copy of it. (the "main data" that's not associated with a block is not
349  * limited)
350  */
351 void
352 XLogRegisterBufData(uint8 block_id, char *data, int len)
353 {
354         registered_buffer *regbuf;
355         XLogRecData *rdata;
356
357         Assert(begininsert_called);
358
359         /* find the registered buffer struct */
360         regbuf = &registered_buffers[block_id];
361         if (!regbuf->in_use)
362                 elog(ERROR, "no block with id %d registered with WAL insertion",
363                          block_id);
364
365         if (num_rdatas >= max_rdatas)
366                 elog(ERROR, "too much WAL data");
367         rdata = &rdatas[num_rdatas++];
368
369         rdata->data = data;
370         rdata->len = len;
371
372         regbuf->rdata_tail->next = rdata;
373         regbuf->rdata_tail = rdata;
374         regbuf->rdata_len += len;
375 }
376
377 /*
378  * Insert an XLOG record having the specified RMID and info bytes, with the
379  * body of the record being the data and buffer references registered earlier
380  * with XLogRegister* calls.
381  *
382  * Returns XLOG pointer to end of record (beginning of next record).
383  * This can be used as LSN for data pages affected by the logged action.
384  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
385  * before the data page can be written out.  This implements the basic
386  * WAL rule "write the log before the data".)
387  */
388 XLogRecPtr
389 XLogInsert(RmgrId rmid, uint8 info)
390 {
391         XLogRecPtr      EndPos;
392
393         /* XLogBeginInsert() must have been called. */
394         if (!begininsert_called)
395                 elog(ERROR, "XLogBeginInsert was not called");
396
397         /*
398          * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
399          * reserved for use by me.
400          */
401         if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
402                 elog(PANIC, "invalid xlog info mask %02X", info);
403
404         TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
405
406         /*
407          * In bootstrap mode, we don't actually log anything but XLOG resources;
408          * return a phony record pointer.
409          */
410         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
411         {
412                 XLogResetInsertion();
413                 EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
414                 return EndPos;
415         }
416
417         do
418         {
419                 XLogRecPtr      RedoRecPtr;
420                 bool            doPageWrites;
421                 XLogRecPtr      fpw_lsn;
422                 XLogRecData *rdt;
423
424                 /*
425                  * Get values needed to decide whether to do full-page writes. Since
426                  * we don't yet have an insertion lock, these could change under us,
427                  * but XLogInsertRecData will recheck them once it has a lock.
428                  */
429                 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
430
431                 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
432                                                                  &fpw_lsn);
433
434                 EndPos = XLogInsertRecord(rdt, fpw_lsn);
435         } while (EndPos == InvalidXLogRecPtr);
436
437         XLogResetInsertion();
438
439         return EndPos;
440 }
441
442 /*
443  * Assemble a WAL record from the registered data and buffers into an
444  * XLogRecData chain, ready for insertion with XLogInsertRecord().
445  *
446  * The record header fields are filled in, except for the xl_prev field. The
447  * calculated CRC does not include the record header yet.
448  *
449  * If there are any registered buffers, and a full-page image was not taken
450  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
451  * signals that the assembled record is only good for insertion on the
452  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
453  */
454 static XLogRecData *
455 XLogRecordAssemble(RmgrId rmid, uint8 info,
456                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
457                                    XLogRecPtr *fpw_lsn)
458 {
459         XLogRecData *rdt;
460         uint32          total_len = 0;
461         int                     block_id;
462         pg_crc32        rdata_crc;
463         registered_buffer *prev_regbuf = NULL;
464         XLogRecData *rdt_datas_last;
465         XLogRecord *rechdr;
466         char       *scratch = hdr_scratch;
467
468         /*
469          * Note: this function can be called multiple times for the same record.
470          * All the modifications we do to the rdata chains below must handle that.
471          */
472
473         /* The record begins with the fixed-size header */
474         rechdr = (XLogRecord *) scratch;
475         scratch += SizeOfXLogRecord;
476
477         hdr_rdt.next = NULL;
478         rdt_datas_last = &hdr_rdt;
479         hdr_rdt.data = hdr_scratch;
480
481         /*
482          * Make an rdata chain containing all the data portions of all block
483          * references. This includes the data for full-page images. Also append
484          * the headers for the block references in the scratch buffer.
485          */
486         *fpw_lsn = InvalidXLogRecPtr;
487         for (block_id = 0; block_id < max_registered_block_id; block_id++)
488         {
489                 registered_buffer *regbuf = &registered_buffers[block_id];
490                 bool            needs_backup;
491                 bool            needs_data;
492                 XLogRecordBlockHeader bkpb;
493                 XLogRecordBlockImageHeader bimg;
494                 XLogRecordBlockCompressHeader cbimg = {0};
495                 bool            samerel;
496                 bool            is_compressed = false;
497
498                 if (!regbuf->in_use)
499                         continue;
500
501                 /* Determine if this block needs to be backed up */
502                 if (regbuf->flags & REGBUF_FORCE_IMAGE)
503                         needs_backup = true;
504                 else if (regbuf->flags & REGBUF_NO_IMAGE)
505                         needs_backup = false;
506                 else if (!doPageWrites)
507                         needs_backup = false;
508                 else
509                 {
510                         /*
511                          * We assume page LSN is first data on *every* page that can be
512                          * passed to XLogInsert, whether it has the standard page layout
513                          * or not.
514                          */
515                         XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
516
517                         needs_backup = (page_lsn <= RedoRecPtr);
518                         if (!needs_backup)
519                         {
520                                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
521                                         *fpw_lsn = page_lsn;
522                         }
523                 }
524
525                 /* Determine if the buffer data needs to included */
526                 if (regbuf->rdata_len == 0)
527                         needs_data = false;
528                 else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
529                         needs_data = true;
530                 else
531                         needs_data = !needs_backup;
532
533                 bkpb.id = block_id;
534                 bkpb.fork_flags = regbuf->forkno;
535                 bkpb.data_length = 0;
536
537                 if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
538                         bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
539
540                 if (needs_backup)
541                 {
542                         Page            page = regbuf->page;
543                         uint16          compressed_len;
544
545                         /*
546                          * The page needs to be backed up, so calculate its hole length
547                          * and offset.
548                          */
549                         if (regbuf->flags & REGBUF_STANDARD)
550                         {
551                                 /* Assume we can omit data between pd_lower and pd_upper */
552                                 uint16          lower = ((PageHeader) page)->pd_lower;
553                                 uint16          upper = ((PageHeader) page)->pd_upper;
554
555                                 if (lower >= SizeOfPageHeaderData &&
556                                         upper > lower &&
557                                         upper <= BLCKSZ)
558                                 {
559                                         bimg.hole_offset = lower;
560                                         cbimg.hole_length = upper - lower;
561                                 }
562                                 else
563                                 {
564                                         /* No "hole" to compress out */
565                                         bimg.hole_offset = 0;
566                                         cbimg.hole_length = 0;
567                                 }
568                         }
569                         else
570                         {
571                                 /* Not a standard page header, don't try to eliminate "hole" */
572                                 bimg.hole_offset = 0;
573                                 cbimg.hole_length = 0;
574                         }
575
576                         /*
577                          * Try to compress a block image if wal_compression is enabled
578                          */
579                         if (wal_compression)
580                         {
581                                 is_compressed =
582                                         XLogCompressBackupBlock(page, bimg.hole_offset,
583                                                                                         cbimg.hole_length,
584                                                                                         regbuf->compressed_page,
585                                                                                         &compressed_len);
586                         }
587
588                         /* Fill in the remaining fields in the XLogRecordBlockHeader struct */
589                         bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
590
591                         /*
592                          * Construct XLogRecData entries for the page content.
593                          */
594                         rdt_datas_last->next = &regbuf->bkp_rdatas[0];
595                         rdt_datas_last = rdt_datas_last->next;
596
597                         bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
598
599                         if (is_compressed)
600                         {
601                                 bimg.length = compressed_len;
602                                 bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
603
604                                 rdt_datas_last->data = regbuf->compressed_page;
605                                 rdt_datas_last->len = compressed_len;
606                         }
607                         else
608                         {
609                                 bimg.length = BLCKSZ - cbimg.hole_length;
610
611                                 if (cbimg.hole_length == 0)
612                                 {
613                                         rdt_datas_last->data = page;
614                                         rdt_datas_last->len = BLCKSZ;
615                                 }
616                                 else
617                                 {
618                                         /* must skip the hole */
619                                         rdt_datas_last->data = page;
620                                         rdt_datas_last->len = bimg.hole_offset;
621
622                                         rdt_datas_last->next = &regbuf->bkp_rdatas[1];
623                                         rdt_datas_last = rdt_datas_last->next;
624
625                                         rdt_datas_last->data =
626                                                 page + (bimg.hole_offset + cbimg.hole_length);
627                                         rdt_datas_last->len =
628                                                 BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
629                                 }
630                         }
631
632                         total_len += bimg.length;
633                 }
634
635                 if (needs_data)
636                 {
637                         /*
638                          * Link the caller-supplied rdata chain for this buffer to the
639                          * overall list.
640                          */
641                         bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
642                         bkpb.data_length = regbuf->rdata_len;
643                         total_len += regbuf->rdata_len;
644
645                         rdt_datas_last->next = regbuf->rdata_head;
646                         rdt_datas_last = regbuf->rdata_tail;
647                 }
648
649                 if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
650                 {
651                         samerel = true;
652                         bkpb.fork_flags |= BKPBLOCK_SAME_REL;
653                         prev_regbuf = regbuf;
654                 }
655                 else
656                         samerel = false;
657
658                 /* Ok, copy the header to the scratch buffer */
659                 memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
660                 scratch += SizeOfXLogRecordBlockHeader;
661                 if (needs_backup)
662                 {
663                         memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
664                         scratch += SizeOfXLogRecordBlockImageHeader;
665                         if (cbimg.hole_length != 0 && is_compressed)
666                         {
667                                 memcpy(scratch, &cbimg,
668                                            SizeOfXLogRecordBlockCompressHeader);
669                                 scratch += SizeOfXLogRecordBlockCompressHeader;
670                         }
671                 }
672                 if (!samerel)
673                 {
674                         memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
675                         scratch += sizeof(RelFileNode);
676                 }
677                 memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
678                 scratch += sizeof(BlockNumber);
679         }
680
681         /* followed by main data, if any */
682         if (mainrdata_len > 0)
683         {
684                 if (mainrdata_len > 255)
685                 {
686                         *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
687                         memcpy(scratch, &mainrdata_len, sizeof(uint32));
688                         scratch += sizeof(uint32);
689                 }
690                 else
691                 {
692                         *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
693                         *(scratch++) = (uint8) mainrdata_len;
694                 }
695                 rdt_datas_last->next = mainrdata_head;
696                 rdt_datas_last = mainrdata_last;
697                 total_len += mainrdata_len;
698         }
699         rdt_datas_last->next = NULL;
700
701         hdr_rdt.len = (scratch - hdr_scratch);
702         total_len += hdr_rdt.len;
703
704         /*
705          * Calculate CRC of the data
706          *
707          * Note that the record header isn't added into the CRC initially since we
708          * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
709          * the whole record in the order: rdata, then backup blocks, then record
710          * header.
711          */
712         INIT_CRC32C(rdata_crc);
713         COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
714         for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
715                 COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
716
717         /*
718          * Fill in the fields in the record header. Prev-link is filled in later,
719          * once we know where in the WAL the record will be inserted. The CRC does
720          * not include the record header yet.
721          */
722         rechdr->xl_xid = GetCurrentTransactionIdIfAny();
723         rechdr->xl_tot_len = total_len;
724         rechdr->xl_info = info;
725         rechdr->xl_rmid = rmid;
726         rechdr->xl_prev = InvalidXLogRecPtr;
727         rechdr->xl_crc = rdata_crc;
728
729         return &hdr_rdt;
730 }
731
732 /*
733  * Create a compressed version of a backup block image.
734  *
735  * Returns FALSE if compression fails (i.e., compressed result is actually
736  * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
737  * the length of compressed block image.
738  */
739 static bool
740 XLogCompressBackupBlock(char * page, uint16 hole_offset, uint16 hole_length,
741                                                 char *dest, uint16 *dlen)
742 {
743         int32           orig_len = BLCKSZ - hole_length;
744         int32           len;
745         int32           extra_bytes = 0;
746         char       *source;
747         char            tmp[BLCKSZ];
748
749         if (hole_length != 0)
750         {
751                 /* must skip the hole */
752                 source = tmp;
753                 memcpy(source, page, hole_offset);
754                 memcpy(source + hole_offset,
755                            page + (hole_offset + hole_length),
756                            BLCKSZ - (hole_length + hole_offset));
757
758                 /*
759                  * Extra data needs to be stored in WAL record for the compressed
760                  * version of block image if the hole exists.
761                  */
762                 extra_bytes = SizeOfXLogRecordBlockCompressHeader;
763         }
764         else
765                 source = page;
766
767         /*
768          * We recheck the actual size even if pglz_compress() reports success
769          * and see if the number of bytes saved by compression is larger than
770          * the length of extra data needed for the compressed version of block
771          * image.
772          */
773         len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
774         if (len >= 0 &&
775                 len + extra_bytes < orig_len)
776         {
777                 *dlen = (uint16) len;           /* successful compression */
778                 return true;
779         }
780         return false;
781 }
782
783 /*
784  * Determine whether the buffer referenced has to be backed up.
785  *
786  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
787  * could change later, so the result should be used for optimization purposes
788  * only.
789  */
790 bool
791 XLogCheckBufferNeedsBackup(Buffer buffer)
792 {
793         XLogRecPtr      RedoRecPtr;
794         bool            doPageWrites;
795         Page            page;
796
797         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
798
799         page = BufferGetPage(buffer);
800
801         if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
802                 return true;                    /* buffer requires backup */
803
804         return false;                           /* buffer does not need to be backed up */
805 }
806
807 /*
808  * Write a backup block if needed when we are setting a hint. Note that
809  * this may be called for a variety of page types, not just heaps.
810  *
811  * Callable while holding just share lock on the buffer content.
812  *
813  * We can't use the plain backup block mechanism since that relies on the
814  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
815  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
816  * failures. So instead we copy the page and insert the copied data as normal
817  * record data.
818  *
819  * We only need to do something if page has not yet been full page written in
820  * this checkpoint round. The LSN of the inserted wal record is returned if we
821  * had to write, InvalidXLogRecPtr otherwise.
822  *
823  * It is possible that multiple concurrent backends could attempt to write WAL
824  * records. In that case, multiple copies of the same block would be recorded
825  * in separate WAL records by different backends, though that is still OK from
826  * a correctness perspective.
827  */
828 XLogRecPtr
829 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
830 {
831         XLogRecPtr      recptr = InvalidXLogRecPtr;
832         XLogRecPtr      lsn;
833         XLogRecPtr      RedoRecPtr;
834
835         /*
836          * Ensure no checkpoint can change our view of RedoRecPtr.
837          */
838         Assert(MyPgXact->delayChkpt);
839
840         /*
841          * Update RedoRecPtr so that we can make the right decision
842          */
843         RedoRecPtr = GetRedoRecPtr();
844
845         /*
846          * We assume page LSN is first data on *every* page that can be passed to
847          * XLogInsert, whether it has the standard page layout or not. Since we're
848          * only holding a share-lock on the page, we must take the buffer header
849          * lock when we look at the LSN.
850          */
851         lsn = BufferGetLSNAtomic(buffer);
852
853         if (lsn <= RedoRecPtr)
854         {
855                 int                     flags;
856                 char            copied_buffer[BLCKSZ];
857                 char       *origdata = (char *) BufferGetBlock(buffer);
858                 RelFileNode rnode;
859                 ForkNumber      forkno;
860                 BlockNumber blkno;
861
862                 /*
863                  * Copy buffer so we don't have to worry about concurrent hint bit or
864                  * lsn updates. We assume pd_lower/upper cannot be changed without an
865                  * exclusive lock, so the contents bkp are not racy.
866                  */
867                 if (buffer_std)
868                 {
869                         /* Assume we can omit data between pd_lower and pd_upper */
870                         Page            page = BufferGetPage(buffer);
871                         uint16          lower = ((PageHeader) page)->pd_lower;
872                         uint16          upper = ((PageHeader) page)->pd_upper;
873
874                         memcpy(copied_buffer, origdata, lower);
875                         memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
876                 }
877                 else
878                         memcpy(copied_buffer, origdata, BLCKSZ);
879
880                 XLogBeginInsert();
881
882                 flags = REGBUF_FORCE_IMAGE;
883                 if (buffer_std)
884                         flags |= REGBUF_STANDARD;
885
886                 BufferGetTag(buffer, &rnode, &forkno, &blkno);
887                 XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
888
889                 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
890         }
891
892         return recptr;
893 }
894
895 /*
896  * Write a WAL record containing a full image of a page. Caller is responsible
897  * for writing the page to disk after calling this routine.
898  *
899  * Note: If you're using this function, you should be building pages in private
900  * memory and writing them directly to smgr.  If you're using buffers, call
901  * log_newpage_buffer instead.
902  *
903  * If the page follows the standard page layout, with a PageHeader and unused
904  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
905  * the unused space to be left out from the WAL record, making it smaller.
906  */
907 XLogRecPtr
908 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
909                         Page page, bool page_std)
910 {
911         int                     flags;
912         XLogRecPtr      recptr;
913
914         flags = REGBUF_FORCE_IMAGE;
915         if (page_std)
916                 flags |= REGBUF_STANDARD;
917
918         XLogBeginInsert();
919         XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
920         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
921
922         /*
923          * The page may be uninitialized. If so, we can't set the LSN because that
924          * would corrupt the page.
925          */
926         if (!PageIsNew(page))
927         {
928                 PageSetLSN(page, recptr);
929         }
930
931         return recptr;
932 }
933
934 /*
935  * Write a WAL record containing a full image of a page.
936  *
937  * Caller should initialize the buffer and mark it dirty before calling this
938  * function.  This function will set the page LSN.
939  *
940  * If the page follows the standard page layout, with a PageHeader and unused
941  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
942  * the unused space to be left out from the WAL record, making it smaller.
943  */
944 XLogRecPtr
945 log_newpage_buffer(Buffer buffer, bool page_std)
946 {
947         Page            page = BufferGetPage(buffer);
948         RelFileNode rnode;
949         ForkNumber      forkNum;
950         BlockNumber blkno;
951
952         /* Shared buffers should be modified in a critical section. */
953         Assert(CritSectionCount > 0);
954
955         BufferGetTag(buffer, &rnode, &forkNum, &blkno);
956
957         return log_newpage(&rnode, forkNum, blkno, page, page_std);
958 }
959
960 /*
961  * Allocate working buffers needed for WAL record construction.
962  */
963 void
964 InitXLogInsert(void)
965 {
966         /* Initialize the working areas */
967         if (xloginsert_cxt == NULL)
968         {
969                 xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
970                                                                                            "WAL record construction",
971                                                                                            ALLOCSET_DEFAULT_MINSIZE,
972                                                                                            ALLOCSET_DEFAULT_INITSIZE,
973                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
974         }
975
976         if (registered_buffers == NULL)
977         {
978                 registered_buffers = (registered_buffer *)
979                         MemoryContextAllocZero(xloginsert_cxt,
980                                   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
981                 max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
982         }
983         if (rdatas == NULL)
984         {
985                 rdatas = MemoryContextAlloc(xloginsert_cxt,
986                                                                         sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
987                 max_rdatas = XLR_NORMAL_RDATAS;
988         }
989
990         /*
991          * Allocate a buffer to hold the header information for a WAL record.
992          */
993         if (hdr_scratch == NULL)
994                 hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
995                                                                                          HEADER_SCRATCH_SIZE);
996 }