]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/clog.c
pgindent run for 8.3.
[postgresql] / src / backend / access / transam / clog.c
1 /*-------------------------------------------------------------------------
2  *
3  * clog.c
4  *              PostgreSQL transaction-commit-log manager
5  *
6  * This module replaces the old "pg_log" access code, which treated pg_log
7  * essentially like a relation, in that it went through the regular buffer
8  * manager.  The problem with that was that there wasn't any good way to
9  * recycle storage space for transactions so old that they'll never be
10  * looked up again.  Now we use specialized access code so that the commit
11  * log can be broken into relatively small, independent segments.
12  *
13  * XLOG interactions: this module generates an XLOG record whenever a new
14  * CLOG page is initialized to zeroes.  Other writes of CLOG come from
15  * recording of transaction commit or abort in xact.c, which generates its
16  * own XLOG records for these events and will re-perform the status update
17  * on redo; so we need make no additional XLOG entry here.      For synchronous
18  * transaction commits, the XLOG is guaranteed flushed through the XLOG commit
19  * record before we are called to log a commit, so the WAL rule "write xlog
20  * before data" is satisfied automatically.  However, for async commits we
21  * must track the latest LSN affecting each CLOG page, so that we can flush
22  * XLOG that far and satisfy the WAL rule.      We don't have to worry about this
23  * for aborts (whether sync or async), since the post-crash assumption would
24  * be that such transactions failed anyway.
25  *
26  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.45 2007/11/15 21:14:32 momjian Exp $
30  *
31  *-------------------------------------------------------------------------
32  */
33 #include "postgres.h"
34
35 #include "access/clog.h"
36 #include "access/slru.h"
37 #include "access/transam.h"
38 #include "postmaster/bgwriter.h"
39
40 /*
41  * Defines for CLOG page sizes.  A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
46  * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
47  * take no explicit notice of that fact in this module, except when comparing
48  * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
49  */
50
51 /* We need two bits per xact, so four xacts fit in a byte */
52 #define CLOG_BITS_PER_XACT      2
53 #define CLOG_XACTS_PER_BYTE 4
54 #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
55 #define CLOG_XACT_BITMASK       ((1 << CLOG_BITS_PER_XACT) - 1)
56
57 #define TransactionIdToPage(xid)        ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
58 #define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
59 #define TransactionIdToByte(xid)        (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
60 #define TransactionIdToBIndex(xid)      ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
61
62 /* We store the latest async LSN for each group of transactions */
63 #define CLOG_XACTS_PER_LSN_GROUP        32      /* keep this a power of 2 */
64 #define CLOG_LSNS_PER_PAGE      (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)
65
66 #define GetLSNIndex(slotno, xid)        ((slotno) * CLOG_LSNS_PER_PAGE + \
67         ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
68
69
70 /*
71  * Link to shared-memory data structures for CLOG control
72  */
73 static SlruCtlData ClogCtlData;
74
75 #define ClogCtl (&ClogCtlData)
76
77
78 static int      ZeroCLOGPage(int pageno, bool writeXlog);
79 static bool CLOGPagePrecedes(int page1, int page2);
80 static void WriteZeroPageXlogRec(int pageno);
81 static void WriteTruncateXlogRec(int pageno);
82
83
84 /*
85  * Record the final state of a transaction in the commit log.
86  *
87  * lsn must be the WAL location of the commit record when recording an async
88  * commit.      For a synchronous commit it can be InvalidXLogRecPtr, since the
89  * caller guarantees the commit record is already flushed in that case.  It
90  * should be InvalidXLogRecPtr for abort cases, too.
91  *
92  * NB: this is a low-level routine and is NOT the preferred entry point
93  * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
94  */
95 void
96 TransactionIdSetStatus(TransactionId xid, XidStatus status, XLogRecPtr lsn)
97 {
98         int                     pageno = TransactionIdToPage(xid);
99         int                     byteno = TransactionIdToByte(xid);
100         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
101         int                     slotno;
102         char       *byteptr;
103         char            byteval;
104
105         Assert(status == TRANSACTION_STATUS_COMMITTED ||
106                    status == TRANSACTION_STATUS_ABORTED ||
107                    status == TRANSACTION_STATUS_SUB_COMMITTED);
108
109         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
110
111         /*
112          * If we're doing an async commit (ie, lsn is valid), then we must wait
113          * for any active write on the page slot to complete.  Otherwise our
114          * update could reach disk in that write, which will not do since we
115          * mustn't let it reach disk until we've done the appropriate WAL flush.
116          * But when lsn is invalid, it's OK to scribble on a page while it is
117          * write-busy, since we don't care if the update reaches disk sooner than
118          * we think.  Hence, pass write_ok = XLogRecPtrIsInvalid(lsn).
119          */
120         slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
121         byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
122
123         /* Current state should be 0, subcommitted or target state */
124         Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
125                    ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == TRANSACTION_STATUS_SUB_COMMITTED ||
126                    ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);
127
128         /* note this assumes exclusive access to the clog page */
129         byteval = *byteptr;
130         byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);
131         byteval |= (status << bshift);
132         *byteptr = byteval;
133
134         ClogCtl->shared->page_dirty[slotno] = true;
135
136         /*
137          * Update the group LSN if the transaction completion LSN is higher.
138          *
139          * Note: lsn will be invalid when supplied during InRecovery processing,
140          * so we don't need to do anything special to avoid LSN updates during
141          * recovery. After recovery completes the next clog change will set the
142          * LSN correctly.
143          */
144         if (!XLogRecPtrIsInvalid(lsn))
145         {
146                 int                     lsnindex = GetLSNIndex(slotno, xid);
147
148                 if (XLByteLT(ClogCtl->shared->group_lsn[lsnindex], lsn))
149                         ClogCtl->shared->group_lsn[lsnindex] = lsn;
150         }
151
152         LWLockRelease(CLogControlLock);
153 }
154
155 /*
156  * Interrogate the state of a transaction in the commit log.
157  *
158  * Aside from the actual commit status, this function returns (into *lsn)
159  * an LSN that is late enough to be able to guarantee that if we flush up to
160  * that LSN then we will have flushed the transaction's commit record to disk.
161  * The result is not necessarily the exact LSN of the transaction's commit
162  * record!      For example, for long-past transactions (those whose clog pages
163  * already migrated to disk), we'll return InvalidXLogRecPtr.  Also, because
164  * we group transactions on the same clog page to conserve storage, we might
165  * return the LSN of a later transaction that falls into the same group.
166  *
167  * NB: this is a low-level routine and is NOT the preferred entry point
168  * for most uses; TransactionLogFetch() in transam.c is the intended caller.
169  */
170 XidStatus
171 TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
172 {
173         int                     pageno = TransactionIdToPage(xid);
174         int                     byteno = TransactionIdToByte(xid);
175         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
176         int                     slotno;
177         int                     lsnindex;
178         char       *byteptr;
179         XidStatus       status;
180
181         /* lock is acquired by SimpleLruReadPage_ReadOnly */
182
183         slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);
184         byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
185
186         status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
187
188         lsnindex = GetLSNIndex(slotno, xid);
189         *lsn = ClogCtl->shared->group_lsn[lsnindex];
190
191         LWLockRelease(CLogControlLock);
192
193         return status;
194 }
195
196
197 /*
198  * Initialization of shared memory for CLOG
199  */
200 Size
201 CLOGShmemSize(void)
202 {
203         return SimpleLruShmemSize(NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE);
204 }
205
206 void
207 CLOGShmemInit(void)
208 {
209         ClogCtl->PagePrecedes = CLOGPagePrecedes;
210         SimpleLruInit(ClogCtl, "CLOG Ctl", NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE,
211                                   CLogControlLock, "pg_clog");
212 }
213
214 /*
215  * This func must be called ONCE on system install.  It creates
216  * the initial CLOG segment.  (The CLOG directory is assumed to
217  * have been created by the initdb shell script, and CLOGShmemInit
218  * must have been called already.)
219  */
220 void
221 BootStrapCLOG(void)
222 {
223         int                     slotno;
224
225         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
226
227         /* Create and zero the first page of the commit log */
228         slotno = ZeroCLOGPage(0, false);
229
230         /* Make sure it's written out */
231         SimpleLruWritePage(ClogCtl, slotno, NULL);
232         Assert(!ClogCtl->shared->page_dirty[slotno]);
233
234         LWLockRelease(CLogControlLock);
235 }
236
237 /*
238  * Initialize (or reinitialize) a page of CLOG to zeroes.
239  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
240  *
241  * The page is not actually written, just set up in shared memory.
242  * The slot number of the new page is returned.
243  *
244  * Control lock must be held at entry, and will be held at exit.
245  */
246 static int
247 ZeroCLOGPage(int pageno, bool writeXlog)
248 {
249         int                     slotno;
250
251         slotno = SimpleLruZeroPage(ClogCtl, pageno);
252
253         if (writeXlog)
254                 WriteZeroPageXlogRec(pageno);
255
256         return slotno;
257 }
258
259 /*
260  * This must be called ONCE during postmaster or standalone-backend startup,
261  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
262  */
263 void
264 StartupCLOG(void)
265 {
266         TransactionId xid = ShmemVariableCache->nextXid;
267         int                     pageno = TransactionIdToPage(xid);
268
269         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
270
271         /*
272          * Initialize our idea of the latest page number.
273          */
274         ClogCtl->shared->latest_page_number = pageno;
275
276         /*
277          * Zero out the remainder of the current clog page.  Under normal
278          * circumstances it should be zeroes already, but it seems at least
279          * theoretically possible that XLOG replay will have settled on a nextXID
280          * value that is less than the last XID actually used and marked by the
281          * previous database lifecycle (since subtransaction commit writes clog
282          * but makes no WAL entry).  Let's just be safe. (We need not worry about
283          * pages beyond the current one, since those will be zeroed when first
284          * used.  For the same reason, there is no need to do anything when
285          * nextXid is exactly at a page boundary; and it's likely that the
286          * "current" page doesn't exist yet in that case.)
287          */
288         if (TransactionIdToPgIndex(xid) != 0)
289         {
290                 int                     byteno = TransactionIdToByte(xid);
291                 int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
292                 int                     slotno;
293                 char       *byteptr;
294
295                 slotno = SimpleLruReadPage(ClogCtl, pageno, false, xid);
296                 byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
297
298                 /* Zero so-far-unused positions in the current byte */
299                 *byteptr &= (1 << bshift) - 1;
300                 /* Zero the rest of the page */
301                 MemSet(byteptr + 1, 0, BLCKSZ - byteno - 1);
302
303                 ClogCtl->shared->page_dirty[slotno] = true;
304         }
305
306         LWLockRelease(CLogControlLock);
307 }
308
309 /*
310  * This must be called ONCE during postmaster or standalone-backend shutdown
311  */
312 void
313 ShutdownCLOG(void)
314 {
315         /* Flush dirty CLOG pages to disk */
316         SimpleLruFlush(ClogCtl, false);
317 }
318
319 /*
320  * Perform a checkpoint --- either during shutdown, or on-the-fly
321  */
322 void
323 CheckPointCLOG(void)
324 {
325         /* Flush dirty CLOG pages to disk */
326         SimpleLruFlush(ClogCtl, true);
327 }
328
329
330 /*
331  * Make sure that CLOG has room for a newly-allocated XID.
332  *
333  * NB: this is called while holding XidGenLock.  We want it to be very fast
334  * most of the time; even when it's not so fast, no actual I/O need happen
335  * unless we're forced to write out a dirty clog or xlog page to make room
336  * in shared memory.
337  */
338 void
339 ExtendCLOG(TransactionId newestXact)
340 {
341         int                     pageno;
342
343         /*
344          * No work except at first XID of a page.  But beware: just after
345          * wraparound, the first XID of page zero is FirstNormalTransactionId.
346          */
347         if (TransactionIdToPgIndex(newestXact) != 0 &&
348                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
349                 return;
350
351         pageno = TransactionIdToPage(newestXact);
352
353         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
354
355         /* Zero the page and make an XLOG entry about it */
356         ZeroCLOGPage(pageno, true);
357
358         LWLockRelease(CLogControlLock);
359 }
360
361
362 /*
363  * Remove all CLOG segments before the one holding the passed transaction ID
364  *
365  * Before removing any CLOG data, we must flush XLOG to disk, to ensure
366  * that any recently-emitted HEAP_FREEZE records have reached disk; otherwise
367  * a crash and restart might leave us with some unfrozen tuples referencing
368  * removed CLOG data.  We choose to emit a special TRUNCATE XLOG record too.
369  * Replaying the deletion from XLOG is not critical, since the files could
370  * just as well be removed later, but doing so prevents a long-running hot
371  * standby server from acquiring an unreasonably bloated CLOG directory.
372  *
373  * Since CLOG segments hold a large number of transactions, the opportunity to
374  * actually remove a segment is fairly rare, and so it seems best not to do
375  * the XLOG flush unless we have confirmed that there is a removable segment.
376  */
377 void
378 TruncateCLOG(TransactionId oldestXact)
379 {
380         int                     cutoffPage;
381
382         /*
383          * The cutoff point is the start of the segment containing oldestXact. We
384          * pass the *page* containing oldestXact to SimpleLruTruncate.
385          */
386         cutoffPage = TransactionIdToPage(oldestXact);
387
388         /* Check to see if there's any files that could be removed */
389         if (!SlruScanDirectory(ClogCtl, cutoffPage, false))
390                 return;                                 /* nothing to remove */
391
392         /* Write XLOG record and flush XLOG to disk */
393         WriteTruncateXlogRec(cutoffPage);
394
395         /* Now we can remove the old CLOG segment(s) */
396         SimpleLruTruncate(ClogCtl, cutoffPage);
397 }
398
399
400 /*
401  * Decide which of two CLOG page numbers is "older" for truncation purposes.
402  *
403  * We need to use comparison of TransactionIds here in order to do the right
404  * thing with wraparound XID arithmetic.  However, if we are asked about
405  * page number zero, we don't want to hand InvalidTransactionId to
406  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
407  * offset both xids by FirstNormalTransactionId to avoid that.
408  */
409 static bool
410 CLOGPagePrecedes(int page1, int page2)
411 {
412         TransactionId xid1;
413         TransactionId xid2;
414
415         xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
416         xid1 += FirstNormalTransactionId;
417         xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
418         xid2 += FirstNormalTransactionId;
419
420         return TransactionIdPrecedes(xid1, xid2);
421 }
422
423
424 /*
425  * Write a ZEROPAGE xlog record
426  */
427 static void
428 WriteZeroPageXlogRec(int pageno)
429 {
430         XLogRecData rdata;
431
432         rdata.data = (char *) (&pageno);
433         rdata.len = sizeof(int);
434         rdata.buffer = InvalidBuffer;
435         rdata.next = NULL;
436         (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE, &rdata);
437 }
438
439 /*
440  * Write a TRUNCATE xlog record
441  *
442  * We must flush the xlog record to disk before returning --- see notes
443  * in TruncateCLOG().
444  */
445 static void
446 WriteTruncateXlogRec(int pageno)
447 {
448         XLogRecData rdata;
449         XLogRecPtr      recptr;
450
451         rdata.data = (char *) (&pageno);
452         rdata.len = sizeof(int);
453         rdata.buffer = InvalidBuffer;
454         rdata.next = NULL;
455         recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata);
456         XLogFlush(recptr);
457 }
458
459 /*
460  * CLOG resource manager's routines
461  */
462 void
463 clog_redo(XLogRecPtr lsn, XLogRecord *record)
464 {
465         uint8           info = record->xl_info & ~XLR_INFO_MASK;
466
467         if (info == CLOG_ZEROPAGE)
468         {
469                 int                     pageno;
470                 int                     slotno;
471
472                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
473
474                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
475
476                 slotno = ZeroCLOGPage(pageno, false);
477                 SimpleLruWritePage(ClogCtl, slotno, NULL);
478                 Assert(!ClogCtl->shared->page_dirty[slotno]);
479
480                 LWLockRelease(CLogControlLock);
481         }
482         else if (info == CLOG_TRUNCATE)
483         {
484                 int                     pageno;
485
486                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
487
488                 /*
489                  * During XLOG replay, latest_page_number isn't set up yet; insert a
490                  * suitable value to bypass the sanity test in SimpleLruTruncate.
491                  */
492                 ClogCtl->shared->latest_page_number = pageno;
493
494                 SimpleLruTruncate(ClogCtl, pageno);
495         }
496         else
497                 elog(PANIC, "clog_redo: unknown op code %u", info);
498 }
499
500 void
501 clog_desc(StringInfo buf, uint8 xl_info, char *rec)
502 {
503         uint8           info = xl_info & ~XLR_INFO_MASK;
504
505         if (info == CLOG_ZEROPAGE)
506         {
507                 int                     pageno;
508
509                 memcpy(&pageno, rec, sizeof(int));
510                 appendStringInfo(buf, "zeropage: %d", pageno);
511         }
512         else if (info == CLOG_TRUNCATE)
513         {
514                 int                     pageno;
515
516                 memcpy(&pageno, rec, sizeof(int));
517                 appendStringInfo(buf, "truncate before: %d", pageno);
518         }
519         else
520                 appendStringInfo(buf, "UNKNOWN");
521 }