]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/clog.c
Update copyright for the year 2010.
[postgresql] / src / backend / access / transam / clog.c
1 /*-------------------------------------------------------------------------
2  *
3  * clog.c
4  *              PostgreSQL transaction-commit-log manager
5  *
6  * This module replaces the old "pg_log" access code, which treated pg_log
7  * essentially like a relation, in that it went through the regular buffer
8  * manager.  The problem with that was that there wasn't any good way to
9  * recycle storage space for transactions so old that they'll never be
10  * looked up again.  Now we use specialized access code so that the commit
11  * log can be broken into relatively small, independent segments.
12  *
13  * XLOG interactions: this module generates an XLOG record whenever a new
14  * CLOG page is initialized to zeroes.  Other writes of CLOG come from
15  * recording of transaction commit or abort in xact.c, which generates its
16  * own XLOG records for these events and will re-perform the status update
17  * on redo; so we need make no additional XLOG entry here.      For synchronous
18  * transaction commits, the XLOG is guaranteed flushed through the XLOG commit
19  * record before we are called to log a commit, so the WAL rule "write xlog
20  * before data" is satisfied automatically.  However, for async commits we
21  * must track the latest LSN affecting each CLOG page, so that we can flush
22  * XLOG that far and satisfy the WAL rule.      We don't have to worry about this
23  * for aborts (whether sync or async), since the post-crash assumption would
24  * be that such transactions failed anyway.
25  *
26  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.55 2010/01/02 16:57:35 momjian Exp $
30  *
31  *-------------------------------------------------------------------------
32  */
33 #include "postgres.h"
34
35 #include "access/clog.h"
36 #include "access/slru.h"
37 #include "access/transam.h"
38 #include "pg_trace.h"
39 #include "postmaster/bgwriter.h"
40
41 /*
42  * Defines for CLOG page sizes.  A page is the same BLCKSZ as is used
43  * everywhere else in Postgres.
44  *
45  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
46  * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
47  * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
48  * take no explicit notice of that fact in this module, except when comparing
49  * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
50  */
51
52 /* We need two bits per xact, so four xacts fit in a byte */
53 #define CLOG_BITS_PER_XACT      2
54 #define CLOG_XACTS_PER_BYTE 4
55 #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
56 #define CLOG_XACT_BITMASK       ((1 << CLOG_BITS_PER_XACT) - 1)
57
58 #define TransactionIdToPage(xid)        ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
59 #define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
60 #define TransactionIdToByte(xid)        (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
61 #define TransactionIdToBIndex(xid)      ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
62
63 /* We store the latest async LSN for each group of transactions */
64 #define CLOG_XACTS_PER_LSN_GROUP        32      /* keep this a power of 2 */
65 #define CLOG_LSNS_PER_PAGE      (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)
66
67 #define GetLSNIndex(slotno, xid)        ((slotno) * CLOG_LSNS_PER_PAGE + \
68         ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
69
70
71 /*
72  * Link to shared-memory data structures for CLOG control
73  */
74 static SlruCtlData ClogCtlData;
75
76 #define ClogCtl (&ClogCtlData)
77
78
79 static int      ZeroCLOGPage(int pageno, bool writeXlog);
80 static bool CLOGPagePrecedes(int page1, int page2);
81 static void WriteZeroPageXlogRec(int pageno);
82 static void WriteTruncateXlogRec(int pageno);
83 static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
84                                                    TransactionId *subxids, XidStatus status,
85                                                    XLogRecPtr lsn, int pageno);
86 static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status,
87                                                   XLogRecPtr lsn, int slotno);
88 static void set_status_by_pages(int nsubxids, TransactionId *subxids,
89                                         XidStatus status, XLogRecPtr lsn);
90
91
92 /*
93  * TransactionIdSetTreeStatus
94  *
95  * Record the final state of transaction entries in the commit log for
96  * a transaction and its subtransaction tree. Take care to ensure this is
97  * efficient, and as atomic as possible.
98  *
99  * xid is a single xid to set status for. This will typically be
100  * the top level transactionid for a top level commit or abort. It can
101  * also be a subtransaction when we record transaction aborts.
102  *
103  * subxids is an array of xids of length nsubxids, representing subtransactions
104  * in the tree of xid. In various cases nsubxids may be zero.
105  *
106  * lsn must be the WAL location of the commit record when recording an async
107  * commit.      For a synchronous commit it can be InvalidXLogRecPtr, since the
108  * caller guarantees the commit record is already flushed in that case.  It
109  * should be InvalidXLogRecPtr for abort cases, too.
110  *
111  * In the commit case, atomicity is limited by whether all the subxids are in
112  * the same CLOG page as xid.  If they all are, then the lock will be grabbed
113  * only once, and the status will be set to committed directly.  Otherwise
114  * we must
115  *       1. set sub-committed all subxids that are not on the same page as the
116  *              main xid
117  *       2. atomically set committed the main xid and the subxids on the same page
118  *       3. go over the first bunch again and set them committed
119  * Note that as far as concurrent checkers are concerned, main transaction
120  * commit as a whole is still atomic.
121  *
122  * Example:
123  *              TransactionId t commits and has subxids t1, t2, t3, t4
124  *              t is on page p1, t1 is also on p1, t2 and t3 are on p2, t4 is on p3
125  *              1. update pages2-3:
126  *                                      page2: set t2,t3 as sub-committed
127  *                                      page3: set t4 as sub-committed
128  *              2. update page1:
129  *                                      set t1 as sub-committed,
130  *                                      then set t as committed,
131                                         then set t1 as committed
132  *              3. update pages2-3:
133  *                                      page2: set t2,t3 as committed
134  *                                      page3: set t4 as committed
135  *
136  * NB: this is a low-level routine and is NOT the preferred entry point
137  * for most uses; functions in transam.c are the intended callers.
138  *
139  * XXX Think about issuing FADVISE_WILLNEED on pages that we will need,
140  * but aren't yet in cache, as well as hinting pages not to fall out of
141  * cache yet.
142  */
143 void
144 TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
145                                         TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
146 {
147         int                     pageno = TransactionIdToPage(xid);              /* get page of parent */
148         int                     i;
149
150         Assert(status == TRANSACTION_STATUS_COMMITTED ||
151                    status == TRANSACTION_STATUS_ABORTED);
152
153         /*
154          * See how many subxids, if any, are on the same page as the parent, if
155          * any.
156          */
157         for (i = 0; i < nsubxids; i++)
158         {
159                 if (TransactionIdToPage(subxids[i]) != pageno)
160                         break;
161         }
162
163         /*
164          * Do all items fit on a single page?
165          */
166         if (i == nsubxids)
167         {
168                 /*
169                  * Set the parent and all subtransactions in a single call
170                  */
171                 TransactionIdSetPageStatus(xid, nsubxids, subxids, status, lsn,
172                                                                    pageno);
173         }
174         else
175         {
176                 int                     nsubxids_on_first_page = i;
177
178                 /*
179                  * If this is a commit then we care about doing this correctly (i.e.
180                  * using the subcommitted intermediate status).  By here, we know
181                  * we're updating more than one page of clog, so we must mark entries
182                  * that are *not* on the first page so that they show as subcommitted
183                  * before we then return to update the status to fully committed.
184                  *
185                  * To avoid touching the first page twice, skip marking subcommitted
186                  * for the subxids on that first page.
187                  */
188                 if (status == TRANSACTION_STATUS_COMMITTED)
189                         set_status_by_pages(nsubxids - nsubxids_on_first_page,
190                                                                 subxids + nsubxids_on_first_page,
191                                                                 TRANSACTION_STATUS_SUB_COMMITTED, lsn);
192
193                 /*
194                  * Now set the parent and subtransactions on same page as the parent,
195                  * if any
196                  */
197                 pageno = TransactionIdToPage(xid);
198                 TransactionIdSetPageStatus(xid, nsubxids_on_first_page, subxids, status,
199                                                                    lsn, pageno);
200
201                 /*
202                  * Now work through the rest of the subxids one clog page at a time,
203                  * starting from the second page onwards, like we did above.
204                  */
205                 set_status_by_pages(nsubxids - nsubxids_on_first_page,
206                                                         subxids + nsubxids_on_first_page,
207                                                         status, lsn);
208         }
209 }
210
211 /*
212  * Helper for TransactionIdSetTreeStatus: set the status for a bunch of
213  * transactions, chunking in the separate CLOG pages involved. We never
214  * pass the whole transaction tree to this function, only subtransactions
215  * that are on different pages to the top level transaction id.
216  */
217 static void
218 set_status_by_pages(int nsubxids, TransactionId *subxids,
219                                         XidStatus status, XLogRecPtr lsn)
220 {
221         int                     pageno = TransactionIdToPage(subxids[0]);
222         int                     offset = 0;
223         int                     i = 0;
224
225         while (i < nsubxids)
226         {
227                 int                     num_on_page = 0;
228
229                 while (TransactionIdToPage(subxids[i]) == pageno && i < nsubxids)
230                 {
231                         num_on_page++;
232                         i++;
233                 }
234
235                 TransactionIdSetPageStatus(InvalidTransactionId,
236                                                                    num_on_page, subxids + offset,
237                                                                    status, lsn, pageno);
238                 offset = i;
239                 pageno = TransactionIdToPage(subxids[offset]);
240         }
241 }
242
243 /*
244  * Record the final state of transaction entries in the commit log for
245  * all entries on a single page.  Atomic only on this page.
246  *
247  * Otherwise API is same as TransactionIdSetTreeStatus()
248  */
249 static void
250 TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
251                                                    TransactionId *subxids, XidStatus status,
252                                                    XLogRecPtr lsn, int pageno)
253 {
254         int                     slotno;
255         int                     i;
256
257         Assert(status == TRANSACTION_STATUS_COMMITTED ||
258                    status == TRANSACTION_STATUS_ABORTED ||
259                    (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
260
261         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
262
263         /*
264          * If we're doing an async commit (ie, lsn is valid), then we must wait
265          * for any active write on the page slot to complete.  Otherwise our
266          * update could reach disk in that write, which will not do since we
267          * mustn't let it reach disk until we've done the appropriate WAL flush.
268          * But when lsn is invalid, it's OK to scribble on a page while it is
269          * write-busy, since we don't care if the update reaches disk sooner than
270          * we think.
271          */
272         slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
273
274         /*
275          * Set the main transaction id, if any.
276          *
277          * If we update more than one xid on this page while it is being written
278          * out, we might find that some of the bits go to disk and others don't.
279          * If we are updating commits on the page with the top-level xid that
280          * could break atomicity, so we subcommit the subxids first before we mark
281          * the top-level commit.
282          */
283         if (TransactionIdIsValid(xid))
284         {
285                 /* Subtransactions first, if needed ... */
286                 if (status == TRANSACTION_STATUS_COMMITTED)
287                 {
288                         for (i = 0; i < nsubxids; i++)
289                         {
290                                 Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
291                                 TransactionIdSetStatusBit(subxids[i],
292                                                                                   TRANSACTION_STATUS_SUB_COMMITTED,
293                                                                                   lsn, slotno);
294                         }
295                 }
296
297                 /* ... then the main transaction */
298                 TransactionIdSetStatusBit(xid, status, lsn, slotno);
299         }
300
301         /* Set the subtransactions */
302         for (i = 0; i < nsubxids; i++)
303         {
304                 Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
305                 TransactionIdSetStatusBit(subxids[i], status, lsn, slotno);
306         }
307
308         ClogCtl->shared->page_dirty[slotno] = true;
309
310         LWLockRelease(CLogControlLock);
311 }
312
313 /*
314  * Sets the commit status of a single transaction.
315  *
316  * Must be called with CLogControlLock held
317  */
318 static void
319 TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
320 {
321         int                     byteno = TransactionIdToByte(xid);
322         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
323         char       *byteptr;
324         char            byteval;
325         char            curval;
326
327         byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
328         curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
329
330         /*
331          * When replaying transactions during recovery we still need to perform
332          * the two phases of subcommit and then commit. However, some transactions
333          * are already correctly marked, so we just treat those as a no-op which
334          * allows us to keep the following Assert as restrictive as possible.
335          */
336         if (InRecovery && status == TRANSACTION_STATUS_SUB_COMMITTED &&
337                 curval == TRANSACTION_STATUS_COMMITTED)
338                 return;
339
340         /*
341          * Current state change should be from 0 or subcommitted to target state
342          * or we should already be there when replaying changes during recovery.
343          */
344         Assert(curval == 0 ||
345                    (curval == TRANSACTION_STATUS_SUB_COMMITTED &&
346                         status != TRANSACTION_STATUS_IN_PROGRESS) ||
347                    curval == status);
348
349         /* note this assumes exclusive access to the clog page */
350         byteval = *byteptr;
351         byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);
352         byteval |= (status << bshift);
353         *byteptr = byteval;
354
355         /*
356          * Update the group LSN if the transaction completion LSN is higher.
357          *
358          * Note: lsn will be invalid when supplied during InRecovery processing,
359          * so we don't need to do anything special to avoid LSN updates during
360          * recovery. After recovery completes the next clog change will set the
361          * LSN correctly.
362          */
363         if (!XLogRecPtrIsInvalid(lsn))
364         {
365                 int                     lsnindex = GetLSNIndex(slotno, xid);
366
367                 if (XLByteLT(ClogCtl->shared->group_lsn[lsnindex], lsn))
368                         ClogCtl->shared->group_lsn[lsnindex] = lsn;
369         }
370 }
371
372 /*
373  * Interrogate the state of a transaction in the commit log.
374  *
375  * Aside from the actual commit status, this function returns (into *lsn)
376  * an LSN that is late enough to be able to guarantee that if we flush up to
377  * that LSN then we will have flushed the transaction's commit record to disk.
378  * The result is not necessarily the exact LSN of the transaction's commit
379  * record!      For example, for long-past transactions (those whose clog pages
380  * already migrated to disk), we'll return InvalidXLogRecPtr.  Also, because
381  * we group transactions on the same clog page to conserve storage, we might
382  * return the LSN of a later transaction that falls into the same group.
383  *
384  * NB: this is a low-level routine and is NOT the preferred entry point
385  * for most uses; TransactionLogFetch() in transam.c is the intended caller.
386  */
387 XidStatus
388 TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
389 {
390         int                     pageno = TransactionIdToPage(xid);
391         int                     byteno = TransactionIdToByte(xid);
392         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
393         int                     slotno;
394         int                     lsnindex;
395         char       *byteptr;
396         XidStatus       status;
397
398         /* lock is acquired by SimpleLruReadPage_ReadOnly */
399
400         slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);
401         byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
402
403         status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
404
405         lsnindex = GetLSNIndex(slotno, xid);
406         *lsn = ClogCtl->shared->group_lsn[lsnindex];
407
408         LWLockRelease(CLogControlLock);
409
410         return status;
411 }
412
413
414 /*
415  * Initialization of shared memory for CLOG
416  */
417 Size
418 CLOGShmemSize(void)
419 {
420         return SimpleLruShmemSize(NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE);
421 }
422
423 void
424 CLOGShmemInit(void)
425 {
426         ClogCtl->PagePrecedes = CLOGPagePrecedes;
427         SimpleLruInit(ClogCtl, "CLOG Ctl", NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE,
428                                   CLogControlLock, "pg_clog");
429 }
430
431 /*
432  * This func must be called ONCE on system install.  It creates
433  * the initial CLOG segment.  (The CLOG directory is assumed to
434  * have been created by the initdb shell script, and CLOGShmemInit
435  * must have been called already.)
436  */
437 void
438 BootStrapCLOG(void)
439 {
440         int                     slotno;
441
442         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
443
444         /* Create and zero the first page of the commit log */
445         slotno = ZeroCLOGPage(0, false);
446
447         /* Make sure it's written out */
448         SimpleLruWritePage(ClogCtl, slotno, NULL);
449         Assert(!ClogCtl->shared->page_dirty[slotno]);
450
451         LWLockRelease(CLogControlLock);
452 }
453
454 /*
455  * Initialize (or reinitialize) a page of CLOG to zeroes.
456  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
457  *
458  * The page is not actually written, just set up in shared memory.
459  * The slot number of the new page is returned.
460  *
461  * Control lock must be held at entry, and will be held at exit.
462  */
463 static int
464 ZeroCLOGPage(int pageno, bool writeXlog)
465 {
466         int                     slotno;
467
468         slotno = SimpleLruZeroPage(ClogCtl, pageno);
469
470         if (writeXlog)
471                 WriteZeroPageXlogRec(pageno);
472
473         return slotno;
474 }
475
476 /*
477  * This must be called ONCE during postmaster or standalone-backend startup,
478  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
479  */
480 void
481 StartupCLOG(void)
482 {
483         TransactionId xid = ShmemVariableCache->nextXid;
484         int                     pageno = TransactionIdToPage(xid);
485
486         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
487
488         /*
489          * Initialize our idea of the latest page number.
490          */
491         ClogCtl->shared->latest_page_number = pageno;
492
493         /*
494          * Zero out the remainder of the current clog page.  Under normal
495          * circumstances it should be zeroes already, but it seems at least
496          * theoretically possible that XLOG replay will have settled on a nextXID
497          * value that is less than the last XID actually used and marked by the
498          * previous database lifecycle (since subtransaction commit writes clog
499          * but makes no WAL entry).  Let's just be safe. (We need not worry about
500          * pages beyond the current one, since those will be zeroed when first
501          * used.  For the same reason, there is no need to do anything when
502          * nextXid is exactly at a page boundary; and it's likely that the
503          * "current" page doesn't exist yet in that case.)
504          */
505         if (TransactionIdToPgIndex(xid) != 0)
506         {
507                 int                     byteno = TransactionIdToByte(xid);
508                 int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
509                 int                     slotno;
510                 char       *byteptr;
511
512                 slotno = SimpleLruReadPage(ClogCtl, pageno, false, xid);
513                 byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
514
515                 /* Zero so-far-unused positions in the current byte */
516                 *byteptr &= (1 << bshift) - 1;
517                 /* Zero the rest of the page */
518                 MemSet(byteptr + 1, 0, BLCKSZ - byteno - 1);
519
520                 ClogCtl->shared->page_dirty[slotno] = true;
521         }
522
523         LWLockRelease(CLogControlLock);
524 }
525
526 /*
527  * This must be called ONCE during postmaster or standalone-backend shutdown
528  */
529 void
530 ShutdownCLOG(void)
531 {
532         /* Flush dirty CLOG pages to disk */
533         TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(false);
534         SimpleLruFlush(ClogCtl, false);
535         TRACE_POSTGRESQL_CLOG_CHECKPOINT_DONE(false);
536 }
537
538 /*
539  * Perform a checkpoint --- either during shutdown, or on-the-fly
540  */
541 void
542 CheckPointCLOG(void)
543 {
544         /* Flush dirty CLOG pages to disk */
545         TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(true);
546         SimpleLruFlush(ClogCtl, true);
547         TRACE_POSTGRESQL_CLOG_CHECKPOINT_DONE(true);
548 }
549
550
551 /*
552  * Make sure that CLOG has room for a newly-allocated XID.
553  *
554  * NB: this is called while holding XidGenLock.  We want it to be very fast
555  * most of the time; even when it's not so fast, no actual I/O need happen
556  * unless we're forced to write out a dirty clog or xlog page to make room
557  * in shared memory.
558  */
559 void
560 ExtendCLOG(TransactionId newestXact)
561 {
562         int                     pageno;
563
564         /*
565          * No work except at first XID of a page.  But beware: just after
566          * wraparound, the first XID of page zero is FirstNormalTransactionId.
567          */
568         if (TransactionIdToPgIndex(newestXact) != 0 &&
569                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
570                 return;
571
572         pageno = TransactionIdToPage(newestXact);
573
574         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
575
576         /* Zero the page and make an XLOG entry about it */
577         ZeroCLOGPage(pageno, !InRecovery);
578
579         LWLockRelease(CLogControlLock);
580 }
581
582
583 /*
584  * Remove all CLOG segments before the one holding the passed transaction ID
585  *
586  * Before removing any CLOG data, we must flush XLOG to disk, to ensure
587  * that any recently-emitted HEAP_FREEZE records have reached disk; otherwise
588  * a crash and restart might leave us with some unfrozen tuples referencing
589  * removed CLOG data.  We choose to emit a special TRUNCATE XLOG record too.
590  * Replaying the deletion from XLOG is not critical, since the files could
591  * just as well be removed later, but doing so prevents a long-running hot
592  * standby server from acquiring an unreasonably bloated CLOG directory.
593  *
594  * Since CLOG segments hold a large number of transactions, the opportunity to
595  * actually remove a segment is fairly rare, and so it seems best not to do
596  * the XLOG flush unless we have confirmed that there is a removable segment.
597  */
598 void
599 TruncateCLOG(TransactionId oldestXact)
600 {
601         int                     cutoffPage;
602
603         /*
604          * The cutoff point is the start of the segment containing oldestXact. We
605          * pass the *page* containing oldestXact to SimpleLruTruncate.
606          */
607         cutoffPage = TransactionIdToPage(oldestXact);
608
609         /* Check to see if there's any files that could be removed */
610         if (!SlruScanDirectory(ClogCtl, cutoffPage, false))
611                 return;                                 /* nothing to remove */
612
613         /* Write XLOG record and flush XLOG to disk */
614         WriteTruncateXlogRec(cutoffPage);
615
616         /* Now we can remove the old CLOG segment(s) */
617         SimpleLruTruncate(ClogCtl, cutoffPage);
618 }
619
620
621 /*
622  * Decide which of two CLOG page numbers is "older" for truncation purposes.
623  *
624  * We need to use comparison of TransactionIds here in order to do the right
625  * thing with wraparound XID arithmetic.  However, if we are asked about
626  * page number zero, we don't want to hand InvalidTransactionId to
627  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
628  * offset both xids by FirstNormalTransactionId to avoid that.
629  */
630 static bool
631 CLOGPagePrecedes(int page1, int page2)
632 {
633         TransactionId xid1;
634         TransactionId xid2;
635
636         xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
637         xid1 += FirstNormalTransactionId;
638         xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
639         xid2 += FirstNormalTransactionId;
640
641         return TransactionIdPrecedes(xid1, xid2);
642 }
643
644
645 /*
646  * Write a ZEROPAGE xlog record
647  */
648 static void
649 WriteZeroPageXlogRec(int pageno)
650 {
651         XLogRecData rdata;
652
653         rdata.data = (char *) (&pageno);
654         rdata.len = sizeof(int);
655         rdata.buffer = InvalidBuffer;
656         rdata.next = NULL;
657         (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE, &rdata);
658 }
659
660 /*
661  * Write a TRUNCATE xlog record
662  *
663  * We must flush the xlog record to disk before returning --- see notes
664  * in TruncateCLOG().
665  */
666 static void
667 WriteTruncateXlogRec(int pageno)
668 {
669         XLogRecData rdata;
670         XLogRecPtr      recptr;
671
672         rdata.data = (char *) (&pageno);
673         rdata.len = sizeof(int);
674         rdata.buffer = InvalidBuffer;
675         rdata.next = NULL;
676         recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata);
677         XLogFlush(recptr);
678 }
679
680 /*
681  * CLOG resource manager's routines
682  */
683 void
684 clog_redo(XLogRecPtr lsn, XLogRecord *record)
685 {
686         uint8           info = record->xl_info & ~XLR_INFO_MASK;
687
688         /* Backup blocks are not used in clog records */
689         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
690
691         if (info == CLOG_ZEROPAGE)
692         {
693                 int                     pageno;
694                 int                     slotno;
695
696                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
697
698                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
699
700                 slotno = ZeroCLOGPage(pageno, false);
701                 SimpleLruWritePage(ClogCtl, slotno, NULL);
702                 Assert(!ClogCtl->shared->page_dirty[slotno]);
703
704                 LWLockRelease(CLogControlLock);
705         }
706         else if (info == CLOG_TRUNCATE)
707         {
708                 int                     pageno;
709
710                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
711
712                 /*
713                  * During XLOG replay, latest_page_number isn't set up yet; insert a
714                  * suitable value to bypass the sanity test in SimpleLruTruncate.
715                  */
716                 ClogCtl->shared->latest_page_number = pageno;
717
718                 SimpleLruTruncate(ClogCtl, pageno);
719         }
720         else
721                 elog(PANIC, "clog_redo: unknown op code %u", info);
722 }
723
724 void
725 clog_desc(StringInfo buf, uint8 xl_info, char *rec)
726 {
727         uint8           info = xl_info & ~XLR_INFO_MASK;
728
729         if (info == CLOG_ZEROPAGE)
730         {
731                 int                     pageno;
732
733                 memcpy(&pageno, rec, sizeof(int));
734                 appendStringInfo(buf, "zeropage: %d", pageno);
735         }
736         else if (info == CLOG_TRUNCATE)
737         {
738                 int                     pageno;
739
740                 memcpy(&pageno, rec, sizeof(int));
741                 appendStringInfo(buf, "truncate before: %d", pageno);
742         }
743         else
744                 appendStringInfo(buf, "UNKNOWN");
745 }