]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xloginsert.c
34e44e4f235a7b5f30e4168c3e5366246fe0060a
[postgresql] / src / backend / access / transam / xloginsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *              Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19
20 #include "postgres.h"
21
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "miscadmin.h"
28 #include "storage/bufmgr.h"
29 #include "storage/proc.h"
30 #include "utils/memutils.h"
31 #include "pg_trace.h"
32
33 /*
34  * For each block reference registered with XLogRegisterBuffer, we fill in
35  * a registered_buffer struct.
36  */
37 typedef struct
38 {
39         bool            in_use;                 /* is this slot in use? */
40         uint8           flags;                  /* REGBUF_* flags */
41         RelFileNode rnode;                      /* identifies the relation and block */
42         ForkNumber      forkno;
43         BlockNumber block;
44         Page            page;                   /* page content */
45         uint32          rdata_len;              /* total length of data in rdata chain */
46         XLogRecData *rdata_head;        /* head of the chain of data registered with
47                                                                  * this block */
48         XLogRecData *rdata_tail;        /* last entry in the chain, or &rdata_head if
49                                                                  * empty */
50
51         XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold references to
52                                                                  * backup block data in XLogRecordAssemble() */
53 }       registered_buffer;
54
55 static registered_buffer *registered_buffers;
56 static int      max_registered_buffers;         /* allocated size */
57 static int      max_registered_block_id = 0;            /* highest block_id + 1
58                                                                                                  * currently registered */
59
60 /*
61  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
62  * with XLogRegisterData(...).
63  */
64 static XLogRecData *mainrdata_head;
65 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
66 static uint32 mainrdata_len;    /* total # of bytes in chain */
67
68 /*
69  * These are used to hold the record header while constructing a record.
70  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
71  * because we want it to be MAXALIGNed and padding bytes zeroed.
72  *
73  * For simplicity, it's allocated large enough to hold the headers for any
74  * WAL record.
75  */
76 static XLogRecData hdr_rdt;
77 static char *hdr_scratch = NULL;
78
79 #define HEADER_SCRATCH_SIZE \
80         (SizeOfXLogRecord + \
81          MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
82          SizeOfXLogRecordDataHeaderLong)
83
84 /*
85  * An array of XLogRecData structs, to hold registered data.
86  */
87 static XLogRecData *rdatas;
88 static int      num_rdatas;                     /* entries currently used */
89 static int      max_rdatas;                     /* allocated size */
90
91 static bool begininsert_called = false;
92
93 /* Memory context to hold the registered buffer and data references. */
94 static MemoryContext xloginsert_cxt;
95
96 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
97                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
98                                    XLogRecPtr *fpw_lsn);
99
100 /*
101  * Begin constructing a WAL record. This must be called before the
102  * XLogRegister* functions and XLogInsert().
103  */
104 void
105 XLogBeginInsert(void)
106 {
107         Assert(max_registered_block_id == 0);
108         Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
109         Assert(mainrdata_len == 0);
110         Assert(!begininsert_called);
111
112         /* cross-check on whether we should be here or not */
113         if (!XLogInsertAllowed())
114                 elog(ERROR, "cannot make new WAL entries during recovery");
115
116         begininsert_called = true;
117 }
118
119 /*
120  * Ensure that there are enough buffer and data slots in the working area,
121  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
122  * calls.
123  *
124  * There is always space for a small number of buffers and data chunks, enough
125  * for most record types. This function is for the exceptional cases that need
126  * more.
127  */
128 void
129 XLogEnsureRecordSpace(int max_block_id, int ndatas)
130 {
131         int                     nbuffers;
132
133         /*
134          * This must be called before entering a critical section, because
135          * allocating memory inside a critical section can fail. repalloc() will
136          * check the same, but better to check it here too so that we fail
137          * consistently even if the arrays happen to be large enough already.
138          */
139         Assert(CritSectionCount == 0);
140
141         /* the minimum values can't be decreased */
142         if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
143                 max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
144         if (ndatas < XLR_NORMAL_RDATAS)
145                 ndatas = XLR_NORMAL_RDATAS;
146
147         if (max_block_id > XLR_MAX_BLOCK_ID)
148                 elog(ERROR, "maximum number of WAL record block references exceeded");
149         nbuffers = max_block_id + 1;
150
151         if (nbuffers > max_registered_buffers)
152         {
153                 registered_buffers = (registered_buffer *)
154                         repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
155
156                 /*
157                  * At least the padding bytes in the structs must be zeroed, because
158                  * they are included in WAL data, but initialize it all for tidiness.
159                  */
160                 MemSet(&registered_buffers[max_registered_buffers], 0,
161                         (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
162                 max_registered_buffers = nbuffers;
163         }
164
165         if (ndatas > max_rdatas)
166         {
167                 rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
168                 max_rdatas = ndatas;
169         }
170 }
171
172 /*
173  * Reset WAL record construction buffers.
174  */
175 void
176 XLogResetInsertion(void)
177 {
178         int                     i;
179
180         for (i = 0; i < max_registered_block_id; i++)
181                 registered_buffers[i].in_use = false;
182
183         num_rdatas = 0;
184         max_registered_block_id = 0;
185         mainrdata_len = 0;
186         mainrdata_last = (XLogRecData *) &mainrdata_head;
187         begininsert_called = false;
188 }
189
190 /*
191  * Register a reference to a buffer with the WAL record being constructed.
192  * This must be called for every page that the WAL-logged operation modifies.
193  */
194 void
195 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
196 {
197         registered_buffer *regbuf;
198
199         /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
200         Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
201         Assert(begininsert_called);
202
203         if (block_id >= max_registered_block_id)
204         {
205                 if (block_id >= max_registered_buffers)
206                         elog(ERROR, "too many registered buffers");
207                 max_registered_block_id = block_id + 1;
208         }
209
210         regbuf = &registered_buffers[block_id];
211
212         BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
213         regbuf->page = BufferGetPage(buffer);
214         regbuf->flags = flags;
215         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
216         regbuf->rdata_len = 0;
217
218         /*
219          * Check that this page hasn't already been registered with some other
220          * block_id.
221          */
222 #ifdef USE_ASSERT_CHECKING
223         {
224                 int                     i;
225
226                 for (i = 0; i < max_registered_block_id; i++)
227                 {
228                         registered_buffer *regbuf_old = &registered_buffers[i];
229
230                         if (i == block_id || !regbuf_old->in_use)
231                                 continue;
232
233                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
234                                    regbuf_old->forkno != regbuf->forkno ||
235                                    regbuf_old->block != regbuf->block);
236                 }
237         }
238 #endif
239
240         regbuf->in_use = true;
241 }
242
243 /*
244  * Like XLogRegisterBuffer, but for registering a block that's not in the
245  * shared buffer pool (i.e. when you don't have a Buffer for it).
246  */
247 void
248 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
249                                   BlockNumber blknum, Page page, uint8 flags)
250 {
251         registered_buffer *regbuf;
252
253         /* This is currently only used to WAL-log a full-page image of a page */
254         Assert(flags & REGBUF_FORCE_IMAGE);
255         Assert(begininsert_called);
256
257         if (block_id >= max_registered_block_id)
258                 max_registered_block_id = block_id + 1;
259
260         if (block_id >= max_registered_buffers)
261                 elog(ERROR, "too many registered buffers");
262
263         regbuf = &registered_buffers[block_id];
264
265         regbuf->rnode = *rnode;
266         regbuf->forkno = forknum;
267         regbuf->block = blknum;
268         regbuf->page = page;
269         regbuf->flags = flags;
270         regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
271         regbuf->rdata_len = 0;
272
273         /*
274          * Check that this page hasn't already been registered with some other
275          * block_id.
276          */
277 #ifdef USE_ASSERT_CHECKING
278         {
279                 int                     i;
280
281                 for (i = 0; i < max_registered_block_id; i++)
282                 {
283                         registered_buffer *regbuf_old = &registered_buffers[i];
284
285                         if (i == block_id || !regbuf_old->in_use)
286                                 continue;
287
288                         Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
289                                    regbuf_old->forkno != regbuf->forkno ||
290                                    regbuf_old->block != regbuf->block);
291                 }
292         }
293 #endif
294
295         regbuf->in_use = true;
296 }
297
298 /*
299  * Add data to the WAL record that's being constructed.
300  *
301  * The data is appended to the "main chunk", available at replay with
302  * XLogGetRecData().
303  */
304 void
305 XLogRegisterData(char *data, int len)
306 {
307         XLogRecData *rdata;
308
309         Assert(begininsert_called);
310
311         if (num_rdatas >= max_rdatas)
312                 elog(ERROR, "too much WAL data");
313         rdata = &rdatas[num_rdatas++];
314
315         rdata->data = data;
316         rdata->len = len;
317
318         /*
319          * we use the mainrdata_last pointer to track the end of the chain, so no
320          * need to clear 'next' here.
321          */
322
323         mainrdata_last->next = rdata;
324         mainrdata_last = rdata;
325
326         mainrdata_len += len;
327 }
328
329 /*
330  * Add buffer-specific data to the WAL record that's being constructed.
331  *
332  * Block_id must reference a block previously registered with
333  * XLogRegisterBuffer(). If this is called more than once for the same
334  * block_id, the data is appended.
335  *
336  * The maximum amount of data that can be registered per block is 65535
337  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
338  * reconstruct the changes to the page, you might as well just log a full
339  * copy of it. (the "main data" that's not associated with a block is not
340  * limited)
341  */
342 void
343 XLogRegisterBufData(uint8 block_id, char *data, int len)
344 {
345         registered_buffer *regbuf;
346         XLogRecData *rdata;
347
348         Assert(begininsert_called);
349
350         /* find the registered buffer struct */
351         regbuf = &registered_buffers[block_id];
352         if (!regbuf->in_use)
353                 elog(ERROR, "no block with id %d registered with WAL insertion",
354                          block_id);
355
356         if (num_rdatas >= max_rdatas)
357                 elog(ERROR, "too much WAL data");
358         rdata = &rdatas[num_rdatas++];
359
360         rdata->data = data;
361         rdata->len = len;
362
363         regbuf->rdata_tail->next = rdata;
364         regbuf->rdata_tail = rdata;
365         regbuf->rdata_len += len;
366 }
367
368 /*
369  * Insert an XLOG record having the specified RMID and info bytes, with the
370  * body of the record being the data and buffer references registered earlier
371  * with XLogRegister* calls.
372  *
373  * Returns XLOG pointer to end of record (beginning of next record).
374  * This can be used as LSN for data pages affected by the logged action.
375  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
376  * before the data page can be written out.  This implements the basic
377  * WAL rule "write the log before the data".)
378  */
379 XLogRecPtr
380 XLogInsert(RmgrId rmid, uint8 info)
381 {
382         XLogRecPtr      EndPos;
383
384         /* XLogBeginInsert() must have been called. */
385         if (!begininsert_called)
386                 elog(ERROR, "XLogBeginInsert was not called");
387
388         /*
389          * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
390          * reserved for use by me.
391          */
392         if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
393                 elog(PANIC, "invalid xlog info mask %02X", info);
394
395         TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
396
397         /*
398          * In bootstrap mode, we don't actually log anything but XLOG resources;
399          * return a phony record pointer.
400          */
401         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
402         {
403                 XLogResetInsertion();
404                 EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
405                 return EndPos;
406         }
407
408         do
409         {
410                 XLogRecPtr      RedoRecPtr;
411                 bool            doPageWrites;
412                 XLogRecPtr      fpw_lsn;
413                 XLogRecData *rdt;
414
415                 /*
416                  * Get values needed to decide whether to do full-page writes. Since
417                  * we don't yet have an insertion lock, these could change under us,
418                  * but XLogInsertRecData will recheck them once it has a lock.
419                  */
420                 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
421
422                 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
423                                                                  &fpw_lsn);
424
425                 EndPos = XLogInsertRecord(rdt, fpw_lsn);
426         } while (EndPos == InvalidXLogRecPtr);
427
428         XLogResetInsertion();
429
430         return EndPos;
431 }
432
433 /*
434  * Assemble a WAL record from the registered data and buffers into an
435  * XLogRecData chain, ready for insertion with XLogInsertRecord().
436  *
437  * The record header fields are filled in, except for the xl_prev field. The
438  * calculated CRC does not include the record header yet.
439  *
440  * If there are any registered buffers, and a full-page image was not taken
441  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
442  * signals that the assembled record is only good for insertion on the
443  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
444  */
445 static XLogRecData *
446 XLogRecordAssemble(RmgrId rmid, uint8 info,
447                                    XLogRecPtr RedoRecPtr, bool doPageWrites,
448                                    XLogRecPtr *fpw_lsn)
449 {
450         XLogRecData *rdt;
451         uint32          total_len = 0;
452         int                     block_id;
453         pg_crc32        rdata_crc;
454         registered_buffer *prev_regbuf = NULL;
455         XLogRecData *rdt_datas_last;
456         XLogRecord *rechdr;
457         char       *scratch = hdr_scratch;
458
459         /*
460          * Note: this function can be called multiple times for the same record.
461          * All the modifications we do to the rdata chains below must handle that.
462          */
463
464         /* The record begins with the fixed-size header */
465         rechdr = (XLogRecord *) scratch;
466         scratch += SizeOfXLogRecord;
467
468         hdr_rdt.next = NULL;
469         rdt_datas_last = &hdr_rdt;
470         hdr_rdt.data = hdr_scratch;
471
472         /*
473          * Make an rdata chain containing all the data portions of all block
474          * references. This includes the data for full-page images. Also append
475          * the headers for the block references in the scratch buffer.
476          */
477         *fpw_lsn = InvalidXLogRecPtr;
478         for (block_id = 0; block_id < max_registered_block_id; block_id++)
479         {
480                 registered_buffer *regbuf = &registered_buffers[block_id];
481                 bool            needs_backup;
482                 bool            needs_data;
483                 XLogRecordBlockHeader bkpb;
484                 XLogRecordBlockImageHeader bimg;
485                 bool            samerel;
486
487                 if (!regbuf->in_use)
488                         continue;
489
490                 /* Determine if this block needs to be backed up */
491                 if (regbuf->flags & REGBUF_FORCE_IMAGE)
492                         needs_backup = true;
493                 else if (regbuf->flags & REGBUF_NO_IMAGE)
494                         needs_backup = false;
495                 else if (!doPageWrites)
496                         needs_backup = false;
497                 else
498                 {
499                         /*
500                          * We assume page LSN is first data on *every* page that can be
501                          * passed to XLogInsert, whether it has the standard page layout
502                          * or not.
503                          */
504                         XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
505
506                         needs_backup = (page_lsn <= RedoRecPtr);
507                         if (!needs_backup)
508                         {
509                                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
510                                         *fpw_lsn = page_lsn;
511                         }
512                 }
513
514                 /* Determine if the buffer data needs to included */
515                 if (regbuf->rdata_len == 0)
516                         needs_data = false;
517                 else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
518                         needs_data = true;
519                 else
520                         needs_data = !needs_backup;
521
522                 bkpb.id = block_id;
523                 bkpb.fork_flags = regbuf->forkno;
524                 bkpb.data_length = 0;
525
526                 if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
527                         bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
528
529                 if (needs_backup)
530                 {
531                         Page            page = regbuf->page;
532
533                         /*
534                          * The page needs to be backed up, so set up *bimg
535                          */
536                         if (regbuf->flags & REGBUF_STANDARD)
537                         {
538                                 /* Assume we can omit data between pd_lower and pd_upper */
539                                 uint16          lower = ((PageHeader) page)->pd_lower;
540                                 uint16          upper = ((PageHeader) page)->pd_upper;
541
542                                 if (lower >= SizeOfPageHeaderData &&
543                                         upper > lower &&
544                                         upper <= BLCKSZ)
545                                 {
546                                         bimg.hole_offset = lower;
547                                         bimg.hole_length = upper - lower;
548                                 }
549                                 else
550                                 {
551                                         /* No "hole" to compress out */
552                                         bimg.hole_offset = 0;
553                                         bimg.hole_length = 0;
554                                 }
555                         }
556                         else
557                         {
558                                 /* Not a standard page header, don't try to eliminate "hole" */
559                                 bimg.hole_offset = 0;
560                                 bimg.hole_length = 0;
561                         }
562
563                         /* Fill in the remaining fields in the XLogRecordBlockData struct */
564                         bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
565
566                         total_len += BLCKSZ - bimg.hole_length;
567
568                         /*
569                          * Construct XLogRecData entries for the page content.
570                          */
571                         rdt_datas_last->next = &regbuf->bkp_rdatas[0];
572                         rdt_datas_last = rdt_datas_last->next;
573                         if (bimg.hole_length == 0)
574                         {
575                                 rdt_datas_last->data = page;
576                                 rdt_datas_last->len = BLCKSZ;
577                         }
578                         else
579                         {
580                                 /* must skip the hole */
581                                 rdt_datas_last->data = page;
582                                 rdt_datas_last->len = bimg.hole_offset;
583
584                                 rdt_datas_last->next = &regbuf->bkp_rdatas[1];
585                                 rdt_datas_last = rdt_datas_last->next;
586
587                                 rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
588                                 rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
589                         }
590                 }
591
592                 if (needs_data)
593                 {
594                         /*
595                          * Link the caller-supplied rdata chain for this buffer to the
596                          * overall list.
597                          */
598                         bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
599                         bkpb.data_length = regbuf->rdata_len;
600                         total_len += regbuf->rdata_len;
601
602                         rdt_datas_last->next = regbuf->rdata_head;
603                         rdt_datas_last = regbuf->rdata_tail;
604                 }
605
606                 if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
607                 {
608                         samerel = true;
609                         bkpb.fork_flags |= BKPBLOCK_SAME_REL;
610                         prev_regbuf = regbuf;
611                 }
612                 else
613                         samerel = false;
614
615                 /* Ok, copy the header to the scratch buffer */
616                 memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
617                 scratch += SizeOfXLogRecordBlockHeader;
618                 if (needs_backup)
619                 {
620                         memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
621                         scratch += SizeOfXLogRecordBlockImageHeader;
622                 }
623                 if (!samerel)
624                 {
625                         memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
626                         scratch += sizeof(RelFileNode);
627                 }
628                 memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
629                 scratch += sizeof(BlockNumber);
630         }
631
632         /* followed by main data, if any */
633         if (mainrdata_len > 0)
634         {
635                 if (mainrdata_len > 255)
636                 {
637                         *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
638                         memcpy(scratch, &mainrdata_len, sizeof(uint32));
639                         scratch += sizeof(uint32);
640                 }
641                 else
642                 {
643                         *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
644                         *(scratch++) = (uint8) mainrdata_len;
645                 }
646                 rdt_datas_last->next = mainrdata_head;
647                 rdt_datas_last = mainrdata_last;
648                 total_len += mainrdata_len;
649         }
650         rdt_datas_last->next = NULL;
651
652         hdr_rdt.len = (scratch - hdr_scratch);
653         total_len += hdr_rdt.len;
654
655         /*
656          * Calculate CRC of the data
657          *
658          * Note that the record header isn't added into the CRC initially since we
659          * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
660          * the whole record in the order: rdata, then backup blocks, then record
661          * header.
662          */
663         INIT_CRC32C(rdata_crc);
664         COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
665         for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
666                 COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
667
668         /*
669          * Fill in the fields in the record header. Prev-link is filled in later,
670          * once we know where in the WAL the record will be inserted. The CRC does
671          * not include the record header yet.
672          */
673         rechdr->xl_xid = GetCurrentTransactionIdIfAny();
674         rechdr->xl_tot_len = total_len;
675         rechdr->xl_info = info;
676         rechdr->xl_rmid = rmid;
677         rechdr->xl_prev = InvalidXLogRecPtr;
678         rechdr->xl_crc = rdata_crc;
679
680         return &hdr_rdt;
681 }
682
683 /*
684  * Determine whether the buffer referenced has to be backed up.
685  *
686  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
687  * could change later, so the result should be used for optimization purposes
688  * only.
689  */
690 bool
691 XLogCheckBufferNeedsBackup(Buffer buffer)
692 {
693         XLogRecPtr      RedoRecPtr;
694         bool            doPageWrites;
695         Page            page;
696
697         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
698
699         page = BufferGetPage(buffer);
700
701         if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
702                 return true;                    /* buffer requires backup */
703
704         return false;                           /* buffer does not need to be backed up */
705 }
706
707 /*
708  * Write a backup block if needed when we are setting a hint. Note that
709  * this may be called for a variety of page types, not just heaps.
710  *
711  * Callable while holding just share lock on the buffer content.
712  *
713  * We can't use the plain backup block mechanism since that relies on the
714  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
715  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
716  * failures. So instead we copy the page and insert the copied data as normal
717  * record data.
718  *
719  * We only need to do something if page has not yet been full page written in
720  * this checkpoint round. The LSN of the inserted wal record is returned if we
721  * had to write, InvalidXLogRecPtr otherwise.
722  *
723  * It is possible that multiple concurrent backends could attempt to write WAL
724  * records. In that case, multiple copies of the same block would be recorded
725  * in separate WAL records by different backends, though that is still OK from
726  * a correctness perspective.
727  */
728 XLogRecPtr
729 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
730 {
731         XLogRecPtr      recptr = InvalidXLogRecPtr;
732         XLogRecPtr      lsn;
733         XLogRecPtr      RedoRecPtr;
734
735         /*
736          * Ensure no checkpoint can change our view of RedoRecPtr.
737          */
738         Assert(MyPgXact->delayChkpt);
739
740         /*
741          * Update RedoRecPtr so that we can make the right decision
742          */
743         RedoRecPtr = GetRedoRecPtr();
744
745         /*
746          * We assume page LSN is first data on *every* page that can be passed to
747          * XLogInsert, whether it has the standard page layout or not. Since we're
748          * only holding a share-lock on the page, we must take the buffer header
749          * lock when we look at the LSN.
750          */
751         lsn = BufferGetLSNAtomic(buffer);
752
753         if (lsn <= RedoRecPtr)
754         {
755                 int                     flags;
756                 char            copied_buffer[BLCKSZ];
757                 char       *origdata = (char *) BufferGetBlock(buffer);
758                 RelFileNode rnode;
759                 ForkNumber      forkno;
760                 BlockNumber blkno;
761
762                 /*
763                  * Copy buffer so we don't have to worry about concurrent hint bit or
764                  * lsn updates. We assume pd_lower/upper cannot be changed without an
765                  * exclusive lock, so the contents bkp are not racy.
766                  */
767                 if (buffer_std)
768                 {
769                         /* Assume we can omit data between pd_lower and pd_upper */
770                         Page            page = BufferGetPage(buffer);
771                         uint16          lower = ((PageHeader) page)->pd_lower;
772                         uint16          upper = ((PageHeader) page)->pd_upper;
773
774                         memcpy(copied_buffer, origdata, lower);
775                         memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
776                 }
777                 else
778                         memcpy(copied_buffer, origdata, BLCKSZ);
779
780                 XLogBeginInsert();
781
782                 flags = REGBUF_FORCE_IMAGE;
783                 if (buffer_std)
784                         flags |= REGBUF_STANDARD;
785
786                 BufferGetTag(buffer, &rnode, &forkno, &blkno);
787                 XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
788
789                 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
790         }
791
792         return recptr;
793 }
794
795 /*
796  * Write a WAL record containing a full image of a page. Caller is responsible
797  * for writing the page to disk after calling this routine.
798  *
799  * Note: If you're using this function, you should be building pages in private
800  * memory and writing them directly to smgr.  If you're using buffers, call
801  * log_newpage_buffer instead.
802  *
803  * If the page follows the standard page layout, with a PageHeader and unused
804  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
805  * the unused space to be left out from the WAL record, making it smaller.
806  */
807 XLogRecPtr
808 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
809                         Page page, bool page_std)
810 {
811         int                     flags;
812         XLogRecPtr      recptr;
813
814         flags = REGBUF_FORCE_IMAGE;
815         if (page_std)
816                 flags |= REGBUF_STANDARD;
817
818         XLogBeginInsert();
819         XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
820         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
821
822         /*
823          * The page may be uninitialized. If so, we can't set the LSN because that
824          * would corrupt the page.
825          */
826         if (!PageIsNew(page))
827         {
828                 PageSetLSN(page, recptr);
829         }
830
831         return recptr;
832 }
833
834 /*
835  * Write a WAL record containing a full image of a page.
836  *
837  * Caller should initialize the buffer and mark it dirty before calling this
838  * function.  This function will set the page LSN.
839  *
840  * If the page follows the standard page layout, with a PageHeader and unused
841  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
842  * the unused space to be left out from the WAL record, making it smaller.
843  */
844 XLogRecPtr
845 log_newpage_buffer(Buffer buffer, bool page_std)
846 {
847         Page            page = BufferGetPage(buffer);
848         RelFileNode rnode;
849         ForkNumber      forkNum;
850         BlockNumber blkno;
851
852         /* Shared buffers should be modified in a critical section. */
853         Assert(CritSectionCount > 0);
854
855         BufferGetTag(buffer, &rnode, &forkNum, &blkno);
856
857         return log_newpage(&rnode, forkNum, blkno, page, page_std);
858 }
859
860 /*
861  * Allocate working buffers needed for WAL record construction.
862  */
863 void
864 InitXLogInsert(void)
865 {
866         /* Initialize the working areas */
867         if (xloginsert_cxt == NULL)
868         {
869                 xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
870                                                                                            "WAL record construction",
871                                                                                            ALLOCSET_DEFAULT_MINSIZE,
872                                                                                            ALLOCSET_DEFAULT_INITSIZE,
873                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
874         }
875
876         if (registered_buffers == NULL)
877         {
878                 registered_buffers = (registered_buffer *)
879                         MemoryContextAllocZero(xloginsert_cxt,
880                                   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
881                 max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
882         }
883         if (rdatas == NULL)
884         {
885                 rdatas = MemoryContextAlloc(xloginsert_cxt,
886                                                                         sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
887                 max_rdatas = XLR_NORMAL_RDATAS;
888         }
889
890         /*
891          * Allocate a buffer to hold the header information for a WAL record.
892          */
893         if (hdr_scratch == NULL)
894                 hdr_scratch = palloc0(HEADER_SCRATCH_SIZE);
895 }