]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/clog.c
Another pgindent run. Fixes enum indenting, and improves #endif
[postgresql] / src / backend / access / transam / clog.c
1 /*-------------------------------------------------------------------------
2  *
3  * clog.c
4  *              PostgreSQL transaction-commit-log manager
5  *
6  * This module replaces the old "pg_log" access code, which treated pg_log
7  * essentially like a relation, in that it went through the regular buffer
8  * manager.  The problem with that was that there wasn't any good way to
9  * recycle storage space for transactions so old that they'll never be
10  * looked up again.  Now we use specialized access code so that the commit
11  * log can be broken into relatively small, independent segments.
12  *
13  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.7 2001/10/28 06:25:42 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <dirent.h>
24 #include <errno.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <unistd.h>
28
29 #include "access/clog.h"
30 #include "storage/lwlock.h"
31 #include "miscadmin.h"
32
33
34 /*
35  * Defines for CLOG page and segment sizes.  A page is the same BLCKSZ
36  * as is used everywhere else in Postgres.      The CLOG segment size can be
37  * chosen somewhat arbitrarily; we make it 1 million transactions by default,
38  * or 256Kb.
39  *
40  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
41  * CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
42  * and CLOG segment numbering at 0xFFFFFFFF/CLOG_XACTS_PER_SEGMENT.  We need
43  * take no explicit notice of that fact in this module, except when comparing
44  * segment and page numbers in TruncateCLOG (see CLOGPagePrecedes).
45  */
46
47 #define CLOG_BLCKSZ                     BLCKSZ
48
49 /* We need two bits per xact, so four xacts fit in a byte */
50 #define CLOG_BITS_PER_XACT      2
51 #define CLOG_XACTS_PER_BYTE 4
52 #define CLOG_XACTS_PER_PAGE (CLOG_BLCKSZ * CLOG_XACTS_PER_BYTE)
53 #define CLOG_XACT_BITMASK       ((1 << CLOG_BITS_PER_XACT) - 1)
54
55 #define CLOG_XACTS_PER_SEGMENT  0x100000
56 #define CLOG_PAGES_PER_SEGMENT  (CLOG_XACTS_PER_SEGMENT / CLOG_XACTS_PER_PAGE)
57
58 #define TransactionIdToPage(xid)        ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
59 #define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
60 #define TransactionIdToByte(xid)        (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
61 #define TransactionIdToBIndex(xid)      ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
62
63
64 /*----------
65  * Shared-memory data structures for CLOG control
66  *
67  * We use a simple least-recently-used scheme to manage a pool of page
68  * buffers for the CLOG.  Under ordinary circumstances we expect that write
69  * traffic will occur mostly to the latest CLOG page (and to the just-prior
70  * page, soon after a page transition).  Read traffic will probably touch
71  * a larger span of pages, but in any case a fairly small number of page
72  * buffers should be sufficient.  So, we just search the buffers using plain
73  * linear search; there's no need for a hashtable or anything fancy.
74  * The management algorithm is straight LRU except that we will never swap
75  * out the latest page (since we know it's going to be hit again eventually).
76  *
77  * We use an overall LWLock to protect the shared data structures, plus
78  * per-buffer LWLocks that synchronize I/O for each buffer.  A process
79  * that is reading in or writing out a page buffer does not hold the control
80  * lock, only the per-buffer lock for the buffer it is working on.
81  *
82  * To change the page number or state of a buffer, one must normally hold
83  * the control lock.  (The sole exception to this rule is that a writer
84  * process changes the state from DIRTY to WRITE_IN_PROGRESS while holding
85  * only the per-buffer lock.)  If the buffer's state is neither EMPTY nor
86  * CLEAN, then there may be processes doing (or waiting to do) I/O on the
87  * buffer, so the page number may not be changed, and the only allowed state
88  * transition is to change WRITE_IN_PROGRESS to DIRTY after dirtying the page.
89  * To do any other state transition involving a buffer with potential I/O
90  * processes, one must hold both the per-buffer lock and the control lock.
91  * (Note the control lock must be acquired second; do not wait on a buffer
92  * lock while holding the control lock.)  A process wishing to read a page
93  * marks the buffer state as READ_IN_PROGRESS, then drops the control lock,
94  * acquires the per-buffer lock, and rechecks the state before proceeding.
95  * This recheck takes care of the possibility that someone else already did
96  * the read, while the early marking prevents someone else from trying to
97  * read the same page into a different buffer.
98  *
99  * Note we are assuming that read and write of the state value is atomic,
100  * since I/O processes may examine and change the state while not holding
101  * the control lock.
102  *
103  * As with the regular buffer manager, it is possible for another process
104  * to re-dirty a page that is currently being written out.      This is handled
105  * by setting the page's state from WRITE_IN_PROGRESS to DIRTY.  The writing
106  * process must notice this and not mark the page CLEAN when it's done.
107  *
108  * XLOG interactions: this module generates an XLOG record whenever a new
109  * CLOG page is initialized to zeroes.  Other writes of CLOG come from
110  * recording of transaction commit or abort in xact.c, which generates its
111  * own XLOG records for these events and will re-perform the status update
112  * on redo; so we need make no additional XLOG entry here.      Also, the XLOG
113  * is guaranteed flushed through the XLOG commit record before we are called
114  * to log a commit, so the WAL rule "write xlog before data" is satisfied
115  * automatically for commits, and we don't really care for aborts.  Therefore,
116  * we don't need to mark XLOG pages with LSN information; we have enough
117  * synchronization already.
118  *----------
119  */
120
121 typedef enum
122 {
123         CLOG_PAGE_EMPTY,                        /* CLOG buffer is not in use */
124         CLOG_PAGE_READ_IN_PROGRESS, /* CLOG page is being read in */
125         CLOG_PAGE_CLEAN,                        /* CLOG page is valid and not dirty */
126         CLOG_PAGE_DIRTY,                        /* CLOG page is valid but needs write */
127         CLOG_PAGE_WRITE_IN_PROGRESS /* CLOG page is being written out in */
128 } ClogPageStatus;
129
130 /*
131  * Shared-memory state for CLOG.
132  */
133 typedef struct ClogCtlData
134 {
135         /*
136          * Info for each buffer slot.  Page number is undefined when status is
137          * EMPTY.  lru_count is essentially the number of operations since
138          * last use of this page; the page with highest lru_count is the best
139          * candidate to replace.
140          */
141         char       *page_buffer[NUM_CLOG_BUFFERS];
142         ClogPageStatus page_status[NUM_CLOG_BUFFERS];
143         int                     page_number[NUM_CLOG_BUFFERS];
144         unsigned int page_lru_count[NUM_CLOG_BUFFERS];
145
146         /*
147          * latest_page_number is the page number of the current end of the
148          * CLOG; this is not critical data, since we use it only to avoid
149          * swapping out the latest page.
150          */
151         int                     latest_page_number;
152 } ClogCtlData;
153
154 static ClogCtlData *ClogCtl = NULL;
155
156 /*
157  * ClogBufferLocks is set during CLOGShmemInit and does not change thereafter.
158  * The value is automatically inherited by backends via fork, and
159  * doesn't need to be in shared memory.
160  */
161 static LWLockId ClogBufferLocks[NUM_CLOG_BUFFERS];              /* Per-buffer I/O locks */
162
163 /*
164  * ClogDir is set during CLOGShmemInit and does not change thereafter.
165  * The value is automatically inherited by backends via fork, and
166  * doesn't need to be in shared memory.
167  */
168 static char ClogDir[MAXPGPATH];
169
170 #define ClogFileName(path, seg) \
171         snprintf(path, MAXPGPATH, "%s/%04X", ClogDir, seg)
172
173 /*
174  * Macro to mark a buffer slot "most recently used".
175  */
176 #define ClogRecentlyUsed(slotno)        \
177         do { \
178                 int             iilru; \
179                 for (iilru = 0; iilru < NUM_CLOG_BUFFERS; iilru++) \
180                         ClogCtl->page_lru_count[iilru]++; \
181                 ClogCtl->page_lru_count[slotno] = 0; \
182         } while (0)
183
184
185 static int      ZeroCLOGPage(int pageno, bool writeXlog);
186 static int      ReadCLOGPage(int pageno);
187 static void WriteCLOGPage(int slotno);
188 static void CLOGPhysicalReadPage(int pageno, int slotno);
189 static void CLOGPhysicalWritePage(int pageno, int slotno);
190 static int      SelectLRUCLOGPage(int pageno);
191 static bool ScanCLOGDirectory(int cutoffPage, bool doDeletions);
192 static bool CLOGPagePrecedes(int page1, int page2);
193 static void WriteZeroPageXlogRec(int pageno);
194
195
196 /*
197  * Record the final state of a transaction in the commit log.
198  *
199  * NB: this is a low-level routine and is NOT the preferred entry point
200  * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
201  */
202 void
203 TransactionIdSetStatus(TransactionId xid, XidStatus status)
204 {
205         int                     pageno = TransactionIdToPage(xid);
206         int                     byteno = TransactionIdToByte(xid);
207         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
208         int                     slotno;
209         char       *byteptr;
210
211         Assert(status == TRANSACTION_STATUS_COMMITTED ||
212                    status == TRANSACTION_STATUS_ABORTED);
213
214         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
215
216         slotno = ReadCLOGPage(pageno);
217         byteptr = ClogCtl->page_buffer[slotno] + byteno;
218
219         /* Current state should be 0 or target state */
220         Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
221                    ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status);
222
223         *byteptr |= (status << bshift);
224
225         ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
226
227         LWLockRelease(CLogControlLock);
228 }
229
230 /*
231  * Interrogate the state of a transaction in the commit log.
232  *
233  * NB: this is a low-level routine and is NOT the preferred entry point
234  * for most uses; TransactionLogTest() in transam.c is the intended caller.
235  */
236 XidStatus
237 TransactionIdGetStatus(TransactionId xid)
238 {
239         int                     pageno = TransactionIdToPage(xid);
240         int                     byteno = TransactionIdToByte(xid);
241         int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
242         int                     slotno;
243         char       *byteptr;
244         XidStatus       status;
245
246         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
247
248         slotno = ReadCLOGPage(pageno);
249         byteptr = ClogCtl->page_buffer[slotno] + byteno;
250
251         status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
252
253         LWLockRelease(CLogControlLock);
254
255         return status;
256 }
257
258
259 /*
260  * Initialization of shared memory for CLOG
261  */
262
263 int
264 CLOGShmemSize(void)
265 {
266         return MAXALIGN(sizeof(ClogCtlData) + CLOG_BLCKSZ * NUM_CLOG_BUFFERS);
267 }
268
269 void
270 CLOGShmemInit(void)
271 {
272         bool            found;
273         char       *bufptr;
274         int                     slotno;
275
276         /* this must agree with space requested by CLOGShmemSize() */
277         ClogCtl = (ClogCtlData *)
278                 ShmemInitStruct("CLOG Ctl",
279                                                 MAXALIGN(sizeof(ClogCtlData) +
280                                                                  CLOG_BLCKSZ * NUM_CLOG_BUFFERS),
281                                                 &found);
282         Assert(!found);
283
284         memset(ClogCtl, 0, sizeof(ClogCtlData));
285
286         bufptr = ((char *) ClogCtl) + sizeof(ClogCtlData);
287
288         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
289         {
290                 ClogCtl->page_buffer[slotno] = bufptr;
291                 ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
292                 ClogBufferLocks[slotno] = LWLockAssign();
293                 bufptr += CLOG_BLCKSZ;
294         }
295
296         /* ClogCtl->latest_page_number will be set later */
297
298         /* Init CLOG directory path */
299         snprintf(ClogDir, MAXPGPATH, "%s/pg_clog", DataDir);
300 }
301
302 /*
303  * This func must be called ONCE on system install.  It creates
304  * the initial CLOG segment.  (The CLOG directory is assumed to
305  * have been created by the initdb shell script, and CLOGShmemInit
306  * must have been called already.)
307  */
308 void
309 BootStrapCLOG(void)
310 {
311         int                     slotno;
312
313         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
314
315         /* Create and zero the first page of the commit log */
316         slotno = ZeroCLOGPage(0, false);
317
318         /* Make sure it's written out */
319         WriteCLOGPage(slotno);
320         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
321
322         LWLockRelease(CLogControlLock);
323 }
324
325 /*
326  * Initialize (or reinitialize) a page of CLOG to zeroes.
327  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
328  *
329  * The page is not actually written, just set up in shared memory.
330  * The slot number of the new page is returned.
331  *
332  * Control lock must be held at entry, and will be held at exit.
333  */
334 static int
335 ZeroCLOGPage(int pageno, bool writeXlog)
336 {
337         int                     slotno;
338
339         /* Find a suitable buffer slot for the page */
340         slotno = SelectLRUCLOGPage(pageno);
341         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
342                    ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN ||
343                    ClogCtl->page_number[slotno] == pageno);
344
345         /* Mark the slot as containing this page */
346         ClogCtl->page_number[slotno] = pageno;
347         ClogCtl->page_status[slotno] = CLOG_PAGE_DIRTY;
348         ClogRecentlyUsed(slotno);
349
350         /* Set the buffer to zeroes */
351         MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
352
353         /* Assume this page is now the latest active page */
354         ClogCtl->latest_page_number = pageno;
355
356         if (writeXlog)
357                 WriteZeroPageXlogRec(pageno);
358
359         return slotno;
360 }
361
362 /*
363  * Find a CLOG page in a shared buffer, reading it in if necessary.
364  * The page number must correspond to an already-initialized page.
365  *
366  * Return value is the shared-buffer slot number now holding the page.
367  * The buffer's LRU access info is updated.
368  *
369  * Control lock must be held at entry, and will be held at exit.
370  */
371 static int
372 ReadCLOGPage(int pageno)
373 {
374         /* Outer loop handles restart if we lose the buffer to someone else */
375         for (;;)
376         {
377                 int                     slotno;
378
379                 /* See if page already is in memory; if not, pick victim slot */
380                 slotno = SelectLRUCLOGPage(pageno);
381
382                 /* Did we find the page in memory? */
383                 if (ClogCtl->page_number[slotno] == pageno &&
384                         ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
385                 {
386                         /* If page is still being read in, we cannot use it yet */
387                         if (ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
388                         {
389                                 /* otherwise, it's ready to use */
390                                 ClogRecentlyUsed(slotno);
391                                 return slotno;
392                         }
393                 }
394                 else
395                 {
396                         /* We found no match; assert we selected a freeable slot */
397                         Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
398                                    ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
399                 }
400
401                 /* Mark the slot read-busy (no-op if it already was) */
402                 ClogCtl->page_number[slotno] = pageno;
403                 ClogCtl->page_status[slotno] = CLOG_PAGE_READ_IN_PROGRESS;
404
405                 /*
406                  * Temporarily mark page as recently-used to discourage
407                  * SelectLRUCLOGPage from selecting it again for someone else.
408                  */
409                 ClogCtl->page_lru_count[slotno] = 0;
410
411                 /* Release shared lock, grab per-buffer lock instead */
412                 LWLockRelease(CLogControlLock);
413                 LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
414
415                 /*
416                  * Check to see if someone else already did the read, or took the
417                  * buffer away from us.  If so, restart from the top.
418                  */
419                 if (ClogCtl->page_number[slotno] != pageno ||
420                         ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS)
421                 {
422                         LWLockRelease(ClogBufferLocks[slotno]);
423                         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
424                         continue;
425                 }
426
427                 /* Okay, do the read */
428                 CLOGPhysicalReadPage(pageno, slotno);
429
430                 /* Re-acquire shared control lock and update page state */
431                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
432
433                 Assert(ClogCtl->page_number[slotno] == pageno &&
434                          ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS);
435
436                 ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
437
438                 LWLockRelease(ClogBufferLocks[slotno]);
439
440                 ClogRecentlyUsed(slotno);
441                 return slotno;
442         }
443 }
444
445 /*
446  * Write a CLOG page from a shared buffer, if necessary.
447  * Does nothing if the specified slot is not dirty.
448  *
449  * NOTE: only one write attempt is made here.  Hence, it is possible that
450  * the page is still dirty at exit (if someone else re-dirtied it during
451  * the write).  However, we *do* attempt a fresh write even if the page
452  * is already being written; this is for checkpoints.
453  *
454  * Control lock must be held at entry, and will be held at exit.
455  */
456 static void
457 WriteCLOGPage(int slotno)
458 {
459         int                     pageno;
460
461         /* Do nothing if page does not need writing */
462         if (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
463                 ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS)
464                 return;
465
466         pageno = ClogCtl->page_number[slotno];
467
468         /* Release shared lock, grab per-buffer lock instead */
469         LWLockRelease(CLogControlLock);
470         LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE);
471
472         /*
473          * Check to see if someone else already did the write, or took the
474          * buffer away from us.  If so, do nothing.  NOTE: we really should
475          * never see WRITE_IN_PROGRESS here, since that state should only
476          * occur while the writer is holding the buffer lock.  But accept it
477          * so that we have a recovery path if a writer aborts.
478          */
479         if (ClogCtl->page_number[slotno] != pageno ||
480                 (ClogCtl->page_status[slotno] != CLOG_PAGE_DIRTY &&
481                  ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS))
482         {
483                 LWLockRelease(ClogBufferLocks[slotno]);
484                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
485                 return;
486         }
487
488         /*
489          * Mark the slot write-busy.  After this point, a transaction status
490          * update on this page will mark it dirty again.  NB: we are assuming
491          * that read/write of the page status field is atomic, since we change
492          * the state while not holding control lock.  However, we cannot set
493          * this state any sooner, or we'd possibly fool a previous writer into
494          * thinking he's successfully dumped the page when he hasn't.
495          * (Scenario: other writer starts, page is redirtied, we come along
496          * and set WRITE_IN_PROGRESS again, other writer completes and sets
497          * CLEAN because redirty info has been lost, then we think it's clean
498          * too.)
499          */
500         ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS;
501
502         /* Okay, do the write */
503         CLOGPhysicalWritePage(pageno, slotno);
504
505         /* Re-acquire shared control lock and update page state */
506         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
507
508         Assert(ClogCtl->page_number[slotno] == pageno &&
509                    (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS ||
510                         ClogCtl->page_status[slotno] == CLOG_PAGE_DIRTY));
511
512         /* Cannot set CLEAN if someone re-dirtied page since write started */
513         if (ClogCtl->page_status[slotno] == CLOG_PAGE_WRITE_IN_PROGRESS)
514                 ClogCtl->page_status[slotno] = CLOG_PAGE_CLEAN;
515
516         LWLockRelease(ClogBufferLocks[slotno]);
517 }
518
519 /*
520  * Physical read of a (previously existing) page into a buffer slot
521  *
522  * For now, assume it's not worth keeping a file pointer open across
523  * read/write operations.  We could cache one virtual file pointer ...
524  */
525 static void
526 CLOGPhysicalReadPage(int pageno, int slotno)
527 {
528         int                     segno = pageno / CLOG_PAGES_PER_SEGMENT;
529         int                     rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
530         int                     offset = rpageno * CLOG_BLCKSZ;
531         char            path[MAXPGPATH];
532         int                     fd;
533
534         ClogFileName(path, segno);
535
536         /*
537          * In a crash-and-restart situation, it's possible for us to receive
538          * commands to set the commit status of transactions whose bits are in
539          * already-truncated segments of the commit log (see notes in
540          * CLOGPhysicalWritePage).      Hence, if we are InRecovery, allow the
541          * case where the file doesn't exist, and return zeroes instead.
542          */
543         fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
544         if (fd < 0)
545         {
546                 if (errno != ENOENT || !InRecovery)
547                         elog(STOP, "open of %s failed: %m", path);
548                 elog(DEBUG, "clog file %s doesn't exist, reading as zeroes", path);
549                 MemSet(ClogCtl->page_buffer[slotno], 0, CLOG_BLCKSZ);
550                 return;
551         }
552
553         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
554                 elog(STOP, "lseek of clog file %u, offset %u failed: %m",
555                          segno, offset);
556
557         errno = 0;
558         if (read(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
559                 elog(STOP, "read of clog file %u, offset %u failed: %m",
560                          segno, offset);
561
562         close(fd);
563 }
564
565 /*
566  * Physical write of a page from a buffer slot
567  *
568  * For now, assume it's not worth keeping a file pointer open across
569  * read/write operations.  We could cache one virtual file pointer ...
570  */
571 static void
572 CLOGPhysicalWritePage(int pageno, int slotno)
573 {
574         int                     segno = pageno / CLOG_PAGES_PER_SEGMENT;
575         int                     rpageno = pageno % CLOG_PAGES_PER_SEGMENT;
576         int                     offset = rpageno * CLOG_BLCKSZ;
577         char            path[MAXPGPATH];
578         int                     fd;
579
580         ClogFileName(path, segno);
581
582         /*
583          * If the file doesn't already exist, we should create it.  It is
584          * possible for this to need to happen when writing a page that's not
585          * first in its segment; we assume the OS can cope with that.  (Note:
586          * it might seem that it'd be okay to create files only when
587          * ZeroCLOGPage is called for the first page of a segment.      However,
588          * if after a crash and restart the REDO logic elects to replay the
589          * log from a checkpoint before the latest one, then it's possible
590          * that we will get commands to set transaction status of transactions
591          * that have already been truncated from the commit log.  Easiest way
592          * to deal with that is to accept references to nonexistent files here
593          * and in CLOGPhysicalReadPage.)
594          */
595         fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
596         if (fd < 0)
597         {
598                 if (errno != ENOENT)
599                         elog(STOP, "open of %s failed: %m", path);
600                 fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
601                                                    S_IRUSR | S_IWUSR);
602                 if (fd < 0)
603                         elog(STOP, "creation of file %s failed: %m", path);
604         }
605
606         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
607                 elog(STOP, "lseek of clog file %u, offset %u failed: %m",
608                          segno, offset);
609
610         errno = 0;
611         if (write(fd, ClogCtl->page_buffer[slotno], CLOG_BLCKSZ) != CLOG_BLCKSZ)
612         {
613                 /* if write didn't set errno, assume problem is no disk space */
614                 if (errno == 0)
615                         errno = ENOSPC;
616                 elog(STOP, "write of clog file %u, offset %u failed: %m",
617                          segno, offset);
618         }
619
620         close(fd);
621 }
622
623 /*
624  * Select the slot to re-use when we need a free slot.
625  *
626  * The target page number is passed because we need to consider the
627  * possibility that some other process reads in the target page while
628  * we are doing I/O to free a slot.  Hence, check or recheck to see if
629  * any slot already holds the target page, and return that slot if so.
630  * Thus, the returned slot is *either* a slot already holding the pageno
631  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
632  * or CLEAN).
633  *
634  * Control lock must be held at entry, and will be held at exit.
635  */
636 static int
637 SelectLRUCLOGPage(int pageno)
638 {
639         /* Outer loop handles restart after I/O */
640         for (;;)
641         {
642                 int                     slotno;
643                 int                     bestslot = 0;
644                 unsigned int bestcount = 0;
645
646                 /* See if page already has a buffer assigned */
647                 for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
648                 {
649                         if (ClogCtl->page_number[slotno] == pageno &&
650                                 ClogCtl->page_status[slotno] != CLOG_PAGE_EMPTY)
651                                 return slotno;
652                 }
653
654                 /*
655                  * If we find any EMPTY slot, just select that one. Else locate
656                  * the least-recently-used slot that isn't the latest CLOG page.
657                  */
658                 for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
659                 {
660                         if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
661                                 return slotno;
662                         if (ClogCtl->page_lru_count[slotno] > bestcount &&
663                          ClogCtl->page_number[slotno] != ClogCtl->latest_page_number)
664                         {
665                                 bestslot = slotno;
666                                 bestcount = ClogCtl->page_lru_count[slotno];
667                         }
668                 }
669
670                 /*
671                  * If the selected page is clean, we're set.
672                  */
673                 if (ClogCtl->page_status[bestslot] == CLOG_PAGE_CLEAN)
674                         return bestslot;
675
676                 /*
677                  * We need to do I/O.  Normal case is that we have to write it
678                  * out, but it's possible in the worst case to have selected a
679                  * read-busy page.      In that case we use ReadCLOGPage to wait for
680                  * the read to complete.
681                  */
682                 if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS)
683                         (void) ReadCLOGPage(ClogCtl->page_number[bestslot]);
684                 else
685                         WriteCLOGPage(bestslot);
686
687                 /*
688                  * Now loop back and try again.  This is the easiest way of
689                  * dealing with corner cases such as the victim page being
690                  * re-dirtied while we wrote it.
691                  */
692         }
693 }
694
695 /*
696  * This must be called ONCE during postmaster or standalone-backend startup,
697  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
698  */
699 void
700 StartupCLOG(void)
701 {
702         /*
703          * Initialize our idea of the latest page number.
704          */
705         ClogCtl->latest_page_number = TransactionIdToPage(ShmemVariableCache->nextXid);
706 }
707
708 /*
709  * This must be called ONCE during postmaster or standalone-backend shutdown
710  */
711 void
712 ShutdownCLOG(void)
713 {
714         int                     slotno;
715
716         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
717
718         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
719         {
720                 WriteCLOGPage(slotno);
721                 Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY ||
722                            ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
723         }
724
725         LWLockRelease(CLogControlLock);
726 }
727
728 /*
729  * Perform a checkpoint --- either during shutdown, or on-the-fly
730  */
731 void
732 CheckPointCLOG(void)
733 {
734         int                     slotno;
735
736         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
737
738         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
739         {
740                 WriteCLOGPage(slotno);
741
742                 /*
743                  * We cannot assert that the slot is clean now, since another
744                  * process might have re-dirtied it already.  That's okay.
745                  */
746         }
747
748         LWLockRelease(CLogControlLock);
749 }
750
751
752 /*
753  * Make sure that CLOG has room for a newly-allocated XID.
754  *
755  * NB: this is called while holding XidGenLock.  We want it to be very fast
756  * most of the time; even when it's not so fast, no actual I/O need happen
757  * unless we're forced to write out a dirty clog or xlog page to make room
758  * in shared memory.
759  */
760 void
761 ExtendCLOG(TransactionId newestXact)
762 {
763         int                     pageno;
764
765         /*
766          * No work except at first XID of a page.  But beware: just after
767          * wraparound, the first XID of page zero is FirstNormalTransactionId.
768          */
769         if (TransactionIdToPgIndex(newestXact) != 0 &&
770                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
771                 return;
772
773         pageno = TransactionIdToPage(newestXact);
774
775         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
776
777         /* Zero the page and make an XLOG entry about it */
778         ZeroCLOGPage(pageno, true);
779
780         LWLockRelease(CLogControlLock);
781 }
782
783
784 /*
785  * Remove all CLOG segments before the one holding the passed transaction ID
786  *
787  * When this is called, we know that the database logically contains no
788  * reference to transaction IDs older than oldestXact.  However, we must
789  * not truncate the CLOG until we have performed a checkpoint, to ensure
790  * that no such references remain on disk either; else a crash just after
791  * the truncation might leave us with a problem.  Since CLOG segments hold
792  * a large number of transactions, the opportunity to actually remove a
793  * segment is fairly rare, and so it seems best not to do the checkpoint
794  * unless we have confirmed that there is a removable segment.  Therefore
795  * we issue the checkpoint command here, not in higher-level code as might
796  * seem cleaner.
797  */
798 void
799 TruncateCLOG(TransactionId oldestXact)
800 {
801         int                     cutoffPage;
802         int                     slotno;
803
804         /*
805          * The cutoff point is the start of the segment containing oldestXact.
806          */
807         oldestXact -= oldestXact % CLOG_XACTS_PER_SEGMENT;
808         cutoffPage = TransactionIdToPage(oldestXact);
809
810         if (!ScanCLOGDirectory(cutoffPage, false))
811                 return;                                 /* nothing to remove */
812
813         /* Perform a CHECKPOINT */
814         CreateCheckPoint(false);
815
816         /*
817          * Scan CLOG shared memory and remove any pages preceding the cutoff
818          * page, to ensure we won't rewrite them later.  (Any dirty pages
819          * should have been flushed already during the checkpoint, we're just
820          * being extra careful here.)
821          */
822         LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
823
824 restart:;
825
826         /*
827          * While we are holding the lock, make an important safety check: the
828          * planned cutoff point must be <= the current CLOG endpoint page.
829          * Otherwise we have already wrapped around, and proceeding with the
830          * truncation would risk removing the current CLOG segment.
831          */
832         if (CLOGPagePrecedes(ClogCtl->latest_page_number, cutoffPage))
833         {
834                 LWLockRelease(CLogControlLock);
835                 elog(LOG, "unable to truncate commit log: apparent wraparound");
836                 return;
837         }
838
839         for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
840         {
841                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_EMPTY)
842                         continue;
843                 if (!CLOGPagePrecedes(ClogCtl->page_number[slotno], cutoffPage))
844                         continue;
845
846                 /*
847                  * If page is CLEAN, just change state to EMPTY (expected case).
848                  */
849                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN)
850                 {
851                         ClogCtl->page_status[slotno] = CLOG_PAGE_EMPTY;
852                         continue;
853                 }
854
855                 /*
856                  * Hmm, we have (or may have) I/O operations acting on the page,
857                  * so we've got to wait for them to finish and then start again.
858                  * This is the same logic as in SelectLRUCLOGPage.
859                  */
860                 if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS)
861                         (void) ReadCLOGPage(ClogCtl->page_number[slotno]);
862                 else
863                         WriteCLOGPage(slotno);
864                 goto restart;
865         }
866
867         LWLockRelease(CLogControlLock);
868
869         /* Now we can remove the old CLOG segment(s) */
870         (void) ScanCLOGDirectory(cutoffPage, true);
871 }
872
873 /*
874  * TruncateCLOG subroutine: scan CLOG directory for removable segments.
875  * Actually remove them iff doDeletions is true.  Return TRUE iff any
876  * removable segments were found.  Note: no locking is needed.
877  */
878 static bool
879 ScanCLOGDirectory(int cutoffPage, bool doDeletions)
880 {
881         bool            found = false;
882         DIR                *cldir;
883         struct dirent *clde;
884         int                     segno;
885         int                     segpage;
886         char            path[MAXPGPATH];
887
888         cldir = opendir(ClogDir);
889         if (cldir == NULL)
890                 elog(STOP, "could not open transaction-commit log directory (%s): %m",
891                          ClogDir);
892
893         errno = 0;
894         while ((clde = readdir(cldir)) != NULL)
895         {
896                 if (strlen(clde->d_name) == 4 &&
897                         strspn(clde->d_name, "0123456789ABCDEF") == 4)
898                 {
899                         segno = (int) strtol(clde->d_name, NULL, 16);
900                         segpage = segno * CLOG_PAGES_PER_SEGMENT;
901                         if (CLOGPagePrecedes(segpage, cutoffPage))
902                         {
903                                 found = true;
904                                 if (doDeletions)
905                                 {
906                                         elog(LOG, "removing commit log file %s", clde->d_name);
907                                         snprintf(path, MAXPGPATH, "%s/%s", ClogDir, clde->d_name);
908                                         unlink(path);
909                                 }
910                         }
911                 }
912                 errno = 0;
913         }
914         if (errno)
915                 elog(STOP, "could not read transaction-commit log directory (%s): %m",
916                          ClogDir);
917         closedir(cldir);
918
919         return found;
920 }
921
922 /*
923  * Decide which of two CLOG page numbers is "older" for truncation purposes.
924  *
925  * We need to use comparison of TransactionIds here in order to do the right
926  * thing with wraparound XID arithmetic.  However, if we are asked about
927  * page number zero, we don't want to hand InvalidTransactionId to
928  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
929  * offset both xids by FirstNormalTransactionId to avoid that.
930  */
931 static bool
932 CLOGPagePrecedes(int page1, int page2)
933 {
934         TransactionId xid1;
935         TransactionId xid2;
936
937         xid1 = ((TransactionId) page1) * CLOG_XACTS_PER_PAGE;
938         xid1 += FirstNormalTransactionId;
939         xid2 = ((TransactionId) page2) * CLOG_XACTS_PER_PAGE;
940         xid2 += FirstNormalTransactionId;
941
942         return TransactionIdPrecedes(xid1, xid2);
943 }
944
945
946 /*
947  * Write a ZEROPAGE xlog record
948  *
949  * Note: xlog record is marked as outside transaction control, since we
950  * want it to be redone whether the invoking transaction commits or not.
951  * (Besides which, this is normally done just before entering a transaction.)
952  */
953 static void
954 WriteZeroPageXlogRec(int pageno)
955 {
956         XLogRecData rdata;
957
958         rdata.buffer = InvalidBuffer;
959         rdata.data = (char *) (&pageno);
960         rdata.len = sizeof(int);
961         rdata.next = NULL;
962         (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
963 }
964
965 /*
966  * CLOG resource manager's routines
967  */
968 void
969 clog_redo(XLogRecPtr lsn, XLogRecord *record)
970 {
971         uint8           info = record->xl_info & ~XLR_INFO_MASK;
972
973         if (info == CLOG_ZEROPAGE)
974         {
975                 int                     pageno;
976                 int                     slotno;
977
978                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
979
980                 LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
981
982                 slotno = ZeroCLOGPage(pageno, false);
983                 WriteCLOGPage(slotno);
984                 Assert(ClogCtl->page_status[slotno] == CLOG_PAGE_CLEAN);
985
986                 LWLockRelease(CLogControlLock);
987         }
988 }
989
990 void
991 clog_undo(XLogRecPtr lsn, XLogRecord *record)
992 {
993 }
994
995 void
996 clog_desc(char *buf, uint8 xl_info, char *rec)
997 {
998         uint8           info = xl_info & ~XLR_INFO_MASK;
999
1000         if (info == CLOG_ZEROPAGE)
1001         {
1002                 int                     pageno;
1003
1004                 memcpy(&pageno, rec, sizeof(int));
1005                 sprintf(buf + strlen(buf), "zeropage: %d", pageno);
1006         }
1007         else
1008                 strcat(buf, "UNKNOWN");
1009 }