]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/clog.c
pgindent run on all C files. Java run to follow. initdb/regression
[postgresql] / src / backend / access / transam / clog.c
1 /*-------------------------------------------------------------------------
2  *
3  * clog.c
4  *              PostgreSQL transaction-commit-log manager
5  *
6  * This module replaces the old "pg_log" access code, which treated pg_log
7  * essentially like a relation, in that it went through the regular buffer
8  * manager.  The problem with that was that there wasn't any good way to
9  * recycle storage space for transactions so old that they'll never be
10  * looked up again.  Now we use specialized access code so that the commit
11  * log can be broken into relatively small, independent segments.
12  *
13  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.5 2001/10/25 05:49:22 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <dirent.h>
24 #include <errno.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <unistd.h>
28
29 #include "access/clog.h"
30 #include "storage/lwlock.h"
31 #include "miscadmin.h"
32
33
34 /*
35  * Defines for CLOG page and segment sizes.  A page is the same BLCKSZ
36  * as is used everywhere else in Postgres.      The CLOG segment size can be
37  * chosen somewhat arbitrarily; we make it 1 million transactions by default,
38  * or 256Kb.
39  *
40  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
41  * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
42  * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
43  * take no explicit notice of that fact in this module, except when comparing
44  * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
45  */
46
47 #define CLOG_BLCKSZ                     BLCKSZ
48
49 /* We need two bits per xact, so four xacts fit in a byte */
50 #define CLOG_BITS_PER_XACT      2
51 #define CLOG_XACTS_PER_BYTE 4
52 #define CLOG_XACTS_PER_PAGE (CLOG_BLCKSZ * CLOG_XACTS_PER_BYTE)
53 #define CLOG_XACT_BITMASK       ((1 << CLOG_BITS_PER_XACT) - 1)
54
55 #define CLOG_XACTS_PER_SEGMENT  0x100000
56 #define CLOG_PAGES_PER_SEGMENT  (CLOG_XACTS_PER_SEGMENT / CLOG_XACTS_PER_PAGE)
57
58 #define TransactionIdToPage(xid)        ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
59 #define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
60 #define TransactionIdToByte(xid)        (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
61 #define TransactionIdToBIndex(xid)      ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
62
63
64 /*----------
65  * Shared-memory data structures for CLOG control
66  *
67  * We use a simple least-recently-used scheme to manage a pool of page
68  * buffers for the CLOG.  Under ordinary circumstances we expect that write
69  * traffic will occur mostly to the latest CLOG page (and to the just-prior
70  * page, soon after a page transition).  Read traffic will probably touch
71  * a larger span of pages, but in any case a fairly small number of page
72  * buffers should be sufficient.  So, we just search the buffers using plain
73  * linear search; there's no need for a hashtable or anything fancy.
74  * The management algorithm is straight LRU except that we will never swap
75  * out the latest page (since we know it's going to be hit again eventually).
76  *
77  * We use an overall LWLock to protect the shared data structures, plus
78  * per-buffer LWLocks that synchronize I/O for each buffer.  A process
79  * that is reading in or writing out a page buffer does not hold the control
80  * lock, only the per-buffer lock for the buffer it is working on.
81  *
82  * To change the page number or state of a buffer, one must normally hold
83  * the control lock.  (The sole exception to this rule is that a writer
84  * process changes the state from DIRTY to WRITE_IN_PROGRESS while holding
85  * only the per-buffer lock.)  If the buffer's state is neither EMPTY nor
86  * CLEAN, then there may be processes doing (or waiting to do) I/O on the
87  * buffer, so the page number may not be changed, and the only allowed state
88  * transition is to change WRITE_IN_PROGRESS to DIRTY after dirtying the page.
89  * To do any other state transition involving a buffer with potential I/O
90  * processes, one must hold both the per-buffer lock and the control lock.
91  * (Note the control lock must be acquired second; do not wait on a buffer
92  * lock while holding the control lock.)  A process wishing to read a page
93  * marks the buffer state as READ_IN_PROGRESS, then drops the control lock,
94  * acquires the per-buffer lock, and rechecks the state before proceeding.
95  * This recheck takes care of the possibility that someone else already did
96  * the read, while the early marking prevents someone else from trying to
97  * read the same page into a different buffer.
98  *
99  * Note we are assuming that read and write of the state value is atomic,
100  * since I/O processes may examine and change the state while not holding
101  * the control lock.
102  *
103  * As with the regular buffer manager, it is possible for another process
104  * to re-dirty a page that is currently being written out.      This is handled
105  * by setting the page's state from WRITE_IN_PROGRESS to DIRTY.  The writing
106  * process must notice this and not mark the page CLEAN when it's done.
107  *
108  * XLOG interactions: this module generates an XLOG record whenever a new
109  * CLOG page is initialized to zeroes.  Other writes of CLOG come from
110  * recording of transaction commit or abort in xact.c, which generates its
111  * own XLOG records for these events and will re-perform the status update
112  * on redo; so we need make no additional XLOG entry here.      Also, the XLOG
113  * is guaranteed flushed through the XLOG commit record before we are called
114  * to log a commit, so the WAL rule "write xlog before data" is satisfied
115  * automatically for commits, and we don't really care for aborts.  Therefore,
116  * we don't need to mark XLOG pages with LSN information; we have enough
117  * synchronization already.
118  *----------
119  */
120
121 typedef enum
122 {
123                                 CLOG_PAGE_EMPTY,/* CLOG buffer is not in use */
124                                 CLOG_PAGE_READ_IN_PROGRESS,             /* CLOG page is being read
125                                                                                                  * in */
126                                 CLOG_PAGE_CLEAN,/* CLOG page is valid and not dirty */
127                                 CLOG_PAGE_DIRTY,/* CLOG page is valid but needs write */
128                                 CLOG_PAGE_WRITE_IN_PROGRESS             /* CLOG page is being
129                                                                                                  * written out in */
130 } ClogPageStatus;
131
132 /*
133  * Shared-memory state for CLOG.
134  */
135 typedef struct ClogCtlData
136 {
137         /*
138          * Info for each buffer slot.  Page number is undefined when status is
139          * EMPTY.  lru_count is essentially the number of operations since
140          * last use of this page; the page with highest lru_count is the best
141          * candidate to replace.
142          */
143         char       *page_buffer[NUM_CLOG_BUFFERS];
144         ClogPageStatus page_status[NUM_CLOG_BUFFERS];
145         int                     page_number[NUM_CLOG_BUFFERS];
146         unsigned int page_lru_count[NUM_CLOG_BUFFERS];
147
148         /*
149          * latest_page_number is the page number of the current end of the
150          * CLOG; this is not critical data, since we use it only to avoid
151          * swapping out the latest page.
152          */
153         int                     latest_page_number;
154 } ClogCtlData;
155
156 static ClogCtlData *ClogCtl = NULL;
157
158 /*
159  * ClogBufferLocks is set during CLOGShmemInit and does not change thereafter.
160  * The value is automatically inherited by backends via fork, and
161  * doesn't need to be in shared memory.
162  */
163 static LWLockId ClogBufferLocks[NUM_CLOG_BUFFERS];              /* Per-buffer I/O locks */
164
165 /*
166  * ClogDir is set during CLOGShmemInit and does not change thereafter.
167  * The value is automatically inherited by backends via fork, and
168  * doesn't need to be in shared memory.
169  */
170 static char ClogDir[MAXPGPATH];
171
172 #define ClogFileName(path, seg) \
173         snprintf(path, MAXPGPATH, "%s/%04X", ClogDir, seg)
174
175 /*
176  * Macro to mark a buffer slot "most recently used".
177  */
178 #define ClogRecentlyUsed(slotno)        \
179         do { \
180                 int             iilru; \
181                 for (iilru = 0; iilru < NUM_CLOG_BUFFERS; iilru++) \
182                         ClogCtl->page_lru_count[iilru]++; \
183                 ClogCtl->page_lru_count[slotno] = 0; \
184         } while (0)
185
186
187 static int      ZeroCLOGPage(int pageno, bool writeXlog);
188 static int      ReadCLOGPage(int pageno);
189 static void WriteCLOGPage(int slotno);
190 static void CLOGPhysicalReadPage(int pageno, int slotno);
191 static void CLOGPhysicalWritePage(int pageno, int slotno);
192 static int      SelectLRUCLOGPage(int pageno);
193 static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions);
194 static bool CLOGPagePrecedes(int page1, int page2);
195 static void WriteZeroPageXlogRec(int pageno);
196
197
198 /*
199  * Record the final state of a transaction in the commit log.
200  *
201  * NB: this is a low-level routine and is NOT the preferred entry point
202  * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
203  */
204 void
205 TransactionIdSetStatus(TransactionId xid, XidStatus status)
206 {
207         int                     pageno = TransactionIdToPage(xid);
208         int                     byteno = TransactionIdToByte(xid);
209         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
210         int                     slotno;
211         char       *byteptr;
212
213         Assert(status == TRANSACTION_STATUS_COMMITTED ||
214                    status == TRANSACTION_STATUS_ABORTED);
215
216         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
217
218         slotno = ReadCLOGPage(pageno);
219         byteptr = ClogCtl->page_buffer[slotno] + byteno;
220
221         /* Current state should be 0 or target state */
222         Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
223                    ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);
224
225         *byteptr |= (status << bshift);
226
227         ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
228
229         LWLockRelease(CLogControlLock);
230 }
231
232 /*
233  * Interrogate the state of a transaction in the commit log.
234  *
235  * NB: this is a low-level routine and is NOT the preferred entry point
236  * for most uses; TransactionLogTest() in transam.c is the intended caller.
237  */
238 XidStatus
239 TransactionIdGetStatus(TransactionId xid)
240 {
241         int                     pageno = TransactionIdToPage(xid);
242         int                     byteno = TransactionIdToByte(xid);
243         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
244         int                     slotno;
245         char       *byteptr;
246         XidStatus       status;
247
248         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
249
250         slotno = ReadCLOGPage(pageno);
251         byteptr = ClogCtl->page_buffer[slotno] + byteno;
252
253         status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
254
255         LWLockRelease(CLogControlLock);
256
257         return status;
258 }
259
260
261 /*
262  * Initialization of shared memory for CLOG
263  */
264
265 int
266 CLOGShmemSize(void)
267 {
268         return MAXALIGN(sizeof(ClogCtlData) + CLOG_BLCKSZ * NUM_CLOG_BUFFERS);
269 }
270
271 void
272 CLOGShmemInit(void)
273 {
274         bool            found;
275         char       *bufptr;
276         int                     slotno;
277
278         /* this must agree with space requested by CLOGShmemSize() */
279         ClogCtl = (ClogCtlData *)
280                 ShmemInitStruct("CLOG Ctl",
281                                                 MAXALIGN(sizeof(ClogCtlData) +
282                                                                  CLOG_BLCKSZ * NUM_CLOG_BUFFERS),
283                                                 &found);
284         Assert(!found);
285
286         memset(ClogCtl, 0, sizeof(ClogCtlData));
287
288         bufptr = ((char *) ClogCtl) + sizeof(ClogCtlData);
289
290         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
291         {
292                 ClogCtl->page_buffer[slotno] = bufptr;
293                 ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
294                 ClogBufferLocks[slotno] = LWLockAssign();
295                 bufptr += CLOG_BLCKSZ;
296         }
297
298         /* ClogCtl->latest_page_number will be set later */
299
300         /* Init CLOG directory path */
301         snprintf(ClogDir, MAXPGPATH, "%s/pg_clog", DataDir);
302 }
303
304 /*
305  * This func must be called ONCE on system install.  It creates
306  * the initial CLOG segment.  (The CLOG directory is assumed to
307  * have been created by the initdb shell script, and CLOGShmemInit
308  * must have been called already.)
309  */
310 void
311 BootStrapCLOG(void)
312 {
313         int                     slotno;
314
315         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
316
317         /* Create and zero the first page of the commit log */
318         slotno = ZeroCLOGPage(0, false);
319
320         /* Make sure it's written out */
321         WriteCLOGPage(slotno);
322         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
323
324         LWLockRelease(CLogControlLock);
325 }
326
327 /*
328  * Initialize (or reinitialize) a page of CLOG to zeroes.
329  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
330  *
331  * The page is not actually written, just set up in shared memory.
332  * The slot number of the new page is returned.
333  *
334  * Control lock must be held at entry, and will be held at exit.
335  */
336 static int
337 ZeroCLOGPage(int pageno, bool writeXlog)
338 {
339         int                     slotno;
340
341         /* Find a suitable buffer slot for the page */
342         slotno = SelectLRUCLOGPage(pageno);
343         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
344                    ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN ||
345                    ClogCtl->page_number[slotno] == pageno);
346
347         /* Mark the slot as containing this page */
348         ClogCtl->page_number[slotno] = pageno;
349         ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
350         ClogRecentlyUsed(slotno);
351
352         /* Set the buffer to zeroes */
353         MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
354
355         /* Assume this page is now the latest active page */
356         ClogCtl->latest_page_number = pageno;
357
358         if (writeXlog)
359                 WriteZeroPageXlogRec(pageno);
360
361         return slotno;
362 }
363
364 /*
365  * Find a CLOG page in a shared buffer, reading it in if necessary.
366  * The page number must correspond to an already-initialized page.
367  *
368  * Return value is the shared-buffer slot number now holding the page.
369  * The buffer's LRU access info is updated.
370  *
371  * Control lock must be held at entry, and will be held at exit.
372  */
373 static int
374 ReadCLOGPage(int pageno)
375 {
376         /* Outer loop handles restart if we lose the buffer to someone else */
377         for (;;)
378         {
379                 int                     slotno;
380
381                 /* See if page already is in memory; if not, pick victim slot */
382                 slotno = SelectLRUCLOGPage(pageno);
383
384                 /* Did we find the page in memory? */
385                 if (ClogCtl->page_number[slotno] == pageno &&
386                         ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
387                 {
388                         /* If page is still being read in, we cannot use it yet */
389                         if (ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
390                         {
391                                 /* otherwise, it's ready to use */
392                                 ClogRecentlyUsed(slotno);
393                                 return slotno;
394                         }
395                 }
396                 else
397                 {
398                         /* We found no match; assert we selected a freeable slot */
399                         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
400                                    ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
401                 }
402
403                 /* Mark the slot read-busy (no-op if it already was) */
404                 ClogCtl->page_number[slotno] = pageno;
405                 ClogCtl->page_status[slotno] = CLOG_PAGE_READ_IN_PROGRESS;
406
407                 /*
408                  * Temporarily mark page as recently-used to discourage
409                  * SelectLRUCLOGPage from selecting it again for someone else.
410                  */
411                 ClogCtl->page_lru_count[slotno] = 0;
412
413                 /* Release shared lock, grab per-buffer lock instead */
414                 LWLockRelease(CLogControlLock);
415                 LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
416
417                 /*
418                  * Check to see if someone else already did the read, or took the
419                  * buffer away from us.  If so, restart from the top.
420                  */
421                 if (ClogCtl->page_number[slotno] != pageno ||
422                         ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
423                 {
424                         LWLockRelease(ClogBufferLocks[slotno]);
425                         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
426                         continue;
427                 }
428
429                 /* Okay, do the read */
430                 CLOGPhysicalReadPage(pageno, slotno);
431
432                 /* Re-acquire shared control lock and update page state */
433                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
434
435                 Assert(ClogCtl->page_number[slotno] == pageno &&
436                          ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS);
437
438                 ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
439
440                 LWLockRelease(ClogBufferLocks[slotno]);
441
442                 ClogRecentlyUsed(slotno);
443                 return slotno;
444         }
445 }
446
447 /*
448  * Write a CLOG page from a shared buffer, if necessary.
449  * Does nothing if the specified slot is not dirty.
450  *
451  * NOTE: only one write attempt is made here.  Hence, it is possible that
452  * the page is still dirty at exit (if someone else re-dirtied it during
453  * the write).  However, we *do* attempt a fresh write even if the page
454  * is already being written; this is for checkpoints.
455  *
456  * Control lock must be held at entry, and will be held at exit.
457  */
458 static void
459 WriteCLOGPage(int slotno)
460 {
461         int                     pageno;
462
463         /* Do nothing if page does not need writing */
464         if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
465                 ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS)
466                 return;
467
468         pageno = ClogCtl->page_number[slotno];
469
470         /* Release shared lock, grab per-buffer lock instead */
471         LWLockRelease(CLogControlLock);
472         LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
473
474         /*
475          * Check to see if someone else already did the write, or took the
476          * buffer away from us.  If so, do nothing.  NOTE: we really should
477          * never see WRITE_IN_PROGRESS here, since that state should only
478          * occur while the writer is holding the buffer lock.  But accept it
479          * so that we have a recovery path if a writer aborts.
480          */
481         if (ClogCtl->page_number[slotno] != pageno ||
482                 (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
483                  ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS))
484         {
485                 LWLockRelease(ClogBufferLocks[slotno]);
486                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
487                 return;
488         }
489
490         /*
491          * Mark the slot write-busy.  After this point, a transaction status
492          * update on this page will mark it dirty again.  NB: we are assuming
493          * that read/write of the page status field is atomic, since we change
494          * the state while not holding control lock.  However, we cannot set
495          * this state any sooner, or we'd possibly fool a previous writer into
496          * thinking he's successfully dumped the page when he hasn't.
497          * (Scenario: other writer starts, page is redirtied, we come along
498          * and set WRITE_IN_PROGRESS again, other writer completes and sets
499          * CLEAN because redirty info has been lost, then we think it's clean
500          * too.)
501          */
502         ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS;
503
504         /* Okay, do the write */
505         CLOGPhysicalWritePage(pageno, slotno);
506
507         /* Re-acquire shared control lock and update page state */
508         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
509
510         Assert(ClogCtl->page_number[slotno] == pageno &&
511                    (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS ||
512                         ClogCtl->page_status[slotno] == CLOG_PAGE_DIRTY));
513
514         /* Cannot set CLEAN if someone re-dirtied page since write started */
515         if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS)
516                 ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
517
518         LWLockRelease(ClogBufferLocks[slotno]);
519 }
520
521 /*
522  * Physical read of a (previously existing) page into a buffer slot
523  *
524  * For now, assume it's not worth keeping a file pointer open across
525  * read/write operations.  We could cache one virtual file pointer ...
526  */
527 static void
528 CLOGPhysicalReadPage(int pageno, int slotno)
529 {
530         int                     segno = pageno / CLOG_PAGES_PER_SEGMENT;
531         int                     rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
532         int                     offset = rpageno * CLOG_BLCKSZ;
533         char            path[MAXPGPATH];
534         int                     fd;
535
536         ClogFileName(path, segno);
537
538         /*
539          * In a crash-and-restart situation, it's possible for us to receive
540          * commands to set the commit status of transactions whose bits are in
541          * already-truncated segments of the commit log (see notes in
542          * CLOGPhysicalWritePage).      Hence, if we are InRecovery, allow the
543          * case where the file doesn't exist, and return zeroes instead.
544          */
545         fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
546         if (fd < 0)
547         {
548                 if (errno != ENOENT || !InRecovery)
549                         elog(STOP, "open of %s failed: %m", path);
550                 elog(DEBUG, "clog file %s doesn't exist, reading as zeroes", path);
551                 MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
552                 return;
553         }
554
555         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
556                 elog(STOP, "lseek of clog file %u, offset %u failed: %m",
557                          segno, offset);
558
559         errno = 0;
560         if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
561                 elog(STOP, "read of clog file %u, offset %u failed: %m",
562                          segno, offset);
563
564         close(fd);
565 }
566
567 /*
568  * Physical write of a page from a buffer slot
569  *
570  * For now, assume it's not worth keeping a file pointer open across
571  * read/write operations.  We could cache one virtual file pointer ...
572  */
573 static void
574 CLOGPhysicalWritePage(int pageno, int slotno)
575 {
576         int                     segno = pageno / CLOG_PAGES_PER_SEGMENT;
577         int                     rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
578         int                     offset = rpageno * CLOG_BLCKSZ;
579         char            path[MAXPGPATH];
580         int                     fd;
581
582         ClogFileName(path, segno);
583
584         /*
585          * If the file doesn't already exist, we should create it.  It is
586          * possible for this to need to happen when writing a page that's not
587          * first in its segment; we assume the OS can cope with that.  (Note:
588          * it might seem that it'd be okay to create files only when
589          * ZeroCLOGPage is called for the first page of a segment.      However,
590          * if after a crash and restart the REDO logic elects to replay the
591          * log from a checkpoint before the latest one, then it's possible
592          * that we will get commands to set transaction status of transactions
593          * that have already been truncated from the commit log.  Easiest way
594          * to deal with that is to accept references to nonexistent files here
595          * and in CLOGPhysicalReadPage.)
596          */
597         fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
598         if (fd < 0)
599         {
600                 if (errno != ENOENT)
601                         elog(STOP, "open of %s failed: %m", path);
602                 fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
603                                                    S_IRUSR | S_IWUSR);
604                 if (fd < 0)
605                         elog(STOP, "creation of file %s failed: %m", path);
606         }
607
608         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
609                 elog(STOP, "lseek of clog file %u, offset %u failed: %m",
610                          segno, offset);
611
612         errno = 0;
613         if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
614         {
615                 /* if write didn't set errno, assume problem is no disk space */
616                 if (errno == 0)
617                         errno = ENOSPC;
618                 elog(STOP, "write of clog file %u, offset %u failed: %m",
619                          segno, offset);
620         }
621
622         close(fd);
623 }
624
625 /*
626  * Select the slot to re-use when we need a free slot.
627  *
628  * The target page number is passed because we need to consider the
629  * possibility that some other process reads in the target page while
630  * we are doing I/O to free a slot.  Hence, check or recheck to see if
631  * any slot already holds the target page, and return that slot if so.
632  * Thus, the returned slot is *either* a slot already holding the pageno
633  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
634  * or CLEAN).
635  *
636  * Control lock must be held at entry, and will be held at exit.
637  */
638 static int
639 SelectLRUCLOGPage(int pageno)
640 {
641         /* Outer loop handles restart after I/O */
642         for (;;)
643         {
644                 int                     slotno;
645                 int                     bestslot = 0;
646                 unsigned int bestcount = 0;
647
648                 /* See if page already has a buffer assigned */
649                 for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
650                 {
651                         if (ClogCtl->page_number[slotno] == pageno &&
652                                 ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
653                                 return slotno;
654                 }
655
656                 /*
657                  * If we find any EMPTY slot, just select that one. Else locate
658                  * the least-recently-used slot that isn't the latest CLOG page.
659                  */
660                 for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
661                 {
662                         if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
663                                 return slotno;
664                         if (ClogCtl->page_lru_count[slotno] > bestcount &&
665                          ClogCtl->page_number[slotno] != ClogCtl->latest_page_number)
666                         {
667                                 bestslot = slotno;
668                                 bestcount = ClogCtl->page_lru_count[slotno];
669                         }
670                 }
671
672                 /*
673                  * If the selected page is clean, we're set.
674                  */
675                 if (ClogCtl->page_status[bestslot] == CLOG_PAGE_CLEAN)
676                         return bestslot;
677
678                 /*
679                  * We need to do I/O.  Normal case is that we have to write it
680                  * out, but it's possible in the worst case to have selected a
681                  * read-busy page.      In that case we use ReadCLOGPage to wait for
682                  * the read to complete.
683                  */
684                 if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS)
685                         (void) ReadCLOGPage(ClogCtl->page_number[bestslot]);
686                 else
687                         WriteCLOGPage(bestslot);
688
689                 /*
690                  * Now loop back and try again.  This is the easiest way of
691                  * dealing with corner cases such as the victim page being
692                  * re-dirtied while we wrote it.
693                  */
694         }
695 }
696
697 /*
698  * This must be called ONCE during postmaster or standalone-backend startup,
699  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
700  */
701 void
702 StartupCLOG(void)
703 {
704         /*
705          * Initialize our idea of the latest page number.
706          */
707         ClogCtl->latest_page_number = TransactionIdToPage(ShmemVariableCache->nextXid);
708 }
709
710 /*
711  * This must be called ONCE during postmaster or standalone-backend shutdown
712  */
713 void
714 ShutdownCLOG(void)
715 {
716         int                     slotno;
717
718         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
719
720         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
721         {
722                 WriteCLOGPage(slotno);
723                 Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
724                            ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
725         }
726
727         LWLockRelease(CLogControlLock);
728 }
729
730 /*
731  * Perform a checkpoint --- either during shutdown, or on-the-fly
732  */
733 void
734 CheckPointCLOG(void)
735 {
736         int                     slotno;
737
738         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
739
740         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
741         {
742                 WriteCLOGPage(slotno);
743
744                 /*
745                  * We cannot assert that the slot is clean now, since another
746                  * process might have re-dirtied it already.  That's okay.
747                  */
748         }
749
750         LWLockRelease(CLogControlLock);
751 }
752
753
754 /*
755  * Make sure that CLOG has room for a newly-allocated XID.
756  *
757  * NB: this is called while holding XidGenLock.  We want it to be very fast
758  * most of the time; even when it's not so fast, no actual I/O need happen
759  * unless we're forced to write out a dirty clog or xlog page to make room
760  * in shared memory.
761  */
762 void
763 ExtendCLOG(TransactionId newestXact)
764 {
765         int                     pageno;
766
767         /*
768          * No work except at first XID of a page.  But beware: just after
769          * wraparound, the first XID of page zero is FirstNormalTransactionId.
770          */
771         if (TransactionIdToPgIndex(newestXact) != 0 &&
772                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
773                 return;
774
775         pageno = TransactionIdToPage(newestXact);
776
777         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
778
779         /* Zero the page and make an XLOG entry about it */
780         ZeroCLOGPage(pageno, true);
781
782         LWLockRelease(CLogControlLock);
783 }
784
785
786 /*
787  * Remove all CLOG segments before the one holding the passed transaction ID
788  *
789  * When this is called, we know that the database logically contains no
790  * reference to transaction IDs older than oldestXact.  However, we must
791  * not truncate the CLOG until we have performed a checkpoint, to ensure
792  * that no such references remain on disk either; else a crash just after
793  * the truncation might leave us with a problem.  Since CLOG segments hold
794  * a large number of transactions, the opportunity to actually remove a
795  * segment is fairly rare, and so it seems best not to do the checkpoint
796  * unless we have confirmed that there is a removable segment.  Therefore
797  * we issue the checkpoint command here, not in higher-level code as might
798  * seem cleaner.
799  */
800 void
801 TruncateCLOG(TransactionId oldestXact)
802 {
803         int                     cutoffPage;
804         int                     slotno;
805
806         /*
807          * The cutoff point is the start of the segment containing oldestXact.
808          */
809         oldestXact -= oldestXact % CLOG_XACTS_PER_SEGMENT;
810         cutoffPage = TransactionIdToPage(oldestXact);
811
812         if (!ScanCLOGDirectory(cutoffPage, false))
813                 return;                                 /* nothing to remove */
814
815         /* Perform a CHECKPOINT */
816         CreateCheckPoint(false);
817
818         /*
819          * Scan CLOG shared memory and remove any pages preceding the cutoff
820          * page, to ensure we won't rewrite them later.  (Any dirty pages
821          * should have been flushed already during the checkpoint, we're just
822          * being extra careful here.)
823          */
824         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
825
826 restart:;
827
828         /*
829          * While we are holding the lock, make an important safety check: the
830          * planned cutoff point must be <= the current CLOG endpoint page.
831          * Otherwise we have already wrapped around, and proceeding with the
832          * truncation would risk removing the current CLOG segment.
833          */
834         if (CLOGPagePrecedes(ClogCtl->latest_page_number, cutoffPage))
835         {
836                 LWLockRelease(CLogControlLock);
837                 elog(LOG, "unable to truncate commit log: apparent wraparound");
838                 return;
839         }
840
841         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
842         {
843                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
844                         continue;
845                 if (!CLOGPagePrecedes(ClogCtl->page_number[slotno], cutoffPage))
846                         continue;
847
848                 /*
849                  * If page is CLEAN, just change state to EMPTY (expected case).
850                  */
851                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN)
852                 {
853                         ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
854                         continue;
855                 }
856
857                 /*
858                  * Hmm, we have (or may have) I/O operations acting on the page,
859                  * so we've got to wait for them to finish and then start again.
860                  * This is the same logic as in SelectLRUCLOGPage.
861                  */
862                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS)
863                         (void) ReadCLOGPage(ClogCtl->page_number[slotno]);
864                 else
865                         WriteCLOGPage(slotno);
866                 goto restart;
867         }
868
869         LWLockRelease(CLogControlLock);
870
871         /* Now we can remove the old CLOG segment(s) */
872         (void) ScanCLOGDirectory(cutoffPage, true);
873 }
874
875 /*
876  * TruncateCLOG subroutine: scan CLOG directory for removable segments.
877  * Actually remove them iff doDeletions is true.  Return TRUE iff any
878  * removable segments were found.  Note: no locking is needed.
879  */
880 static bool
881 ScanCLOGDirectory(int cutoffPage, bool doDeletions)
882 {
883         bool            found = false;
884         DIR                *cldir;
885         struct dirent *clde;
886         int                     segno;
887         int                     segpage;
888         char            path[MAXPGPATH];
889
890         cldir = opendir(ClogDir);
891         if (cldir == NULL)
892                 elog(STOP, "could not open transaction-commit log directory (%s): %m",
893                          ClogDir);
894
895         errno = 0;
896         while ((clde = readdir(cldir)) != NULL)
897         {
898                 if (strlen(clde->d_name) == 4 &&
899                         strspn(clde->d_name, "0123456789ABCDEF") == 4)
900                 {
901                         segno = (int) strtol(clde->d_name, NULL, 16);
902                         segpage = segno * CLOG_PAGES_PER_SEGMENT;
903                         if (CLOGPagePrecedes(segpage, cutoffPage))
904                         {
905                                 found = true;
906                                 if (doDeletions)
907                                 {
908                                         elog(LOG, "removing commit log file %s", clde->d_name);
909                                         snprintf(path, MAXPGPATH, "%s/%s", ClogDir, clde->d_name);
910                                         unlink(path);
911                                 }
912                         }
913                 }
914                 errno = 0;
915         }
916         if (errno)
917                 elog(STOP, "could not read transaction-commit log directory (%s): %m",
918                          ClogDir);
919         closedir(cldir);
920
921         return found;
922 }
923
924 /*
925  * Decide which of two CLOG page numbers is "older" for truncation purposes.
926  *
927  * We need to use comparison of TransactionIds here in order to do the right
928  * thing with wraparound XID arithmetic.  However, if we are asked about
929  * page number zero, we don't want to hand InvalidTransactionId to
930  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
931  * offset both xids by FirstNormalTransactionId to avoid that.
932  */
933 static bool
934 CLOGPagePrecedes(int page1, int page2)
935 {
936         TransactionId xid1;
937         TransactionId xid2;
938
939         xid1 = (TransactionId) page1 *CLOG_XACTS_PER_PAGE;
940
941         xid1 += FirstNormalTransactionId;
942         xid2 = (TransactionId) page2 *CLOG_XACTS_PER_PAGE;
943
944         xid2 += FirstNormalTransactionId;
945
946         return TransactionIdPrecedes(xid1, xid2);
947 }
948
949
950 /*
951  * Write a ZEROPAGE xlog record
952  *
953  * Note: xlog record is marked as outside transaction control, since we
954  * want it to be redone whether the invoking transaction commits or not.
955  * (Besides which, this is normally done just before entering a transaction.)
956  */
957 static void
958 WriteZeroPageXlogRec(int pageno)
959 {
960         XLogRecData rdata;
961
962         rdata.buffer = InvalidBuffer;
963         rdata.data = (char *) (&pageno);
964         rdata.len = sizeof(int);
965         rdata.next = NULL;
966         (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
967 }
968
969 /*
970  * CLOG resource manager's routines
971  */
972 void
973 clog_redo(XLogRecPtr lsn, XLogRecord *record)
974 {
975         uint8           info = record->xl_info & ~XLR_INFO_MASK;
976
977         if (info == CLOG_ZEROPAGE)
978         {
979                 int                     pageno;
980                 int                     slotno;
981
982                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
983
984                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
985
986                 slotno = ZeroCLOGPage(pageno, false);
987                 WriteCLOGPage(slotno);
988                 Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
989
990                 LWLockRelease(CLogControlLock);
991         }
992 }
993
994 void
995 clog_undo(XLogRecPtr lsn, XLogRecord *record)
996 {
997 }
998
999 void
1000 clog_desc(char *buf, uint8 xl_info, char *rec)
1001 {
1002         uint8           info = xl_info & ~XLR_INFO_MASK;
1003
1004         if (info == CLOG_ZEROPAGE)
1005         {
1006                 int                     pageno;
1007
1008                 memcpy(&pageno, rec, sizeof(int));
1009                 sprintf(buf + strlen(buf), "zeropage: %d", pageno);
1010         }
1011         else
1012                 strcat(buf, "UNKNOWN");
1013 }