]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/slru.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / access / transam / slru.c
1 /*-------------------------------------------------------------------------
2  *
3  * slru.c
4  *              Simple LRU buffering for transaction status logfiles
5  *
6  * We use a simple least-recently-used scheme to manage a pool of page
7  * buffers.  Under ordinary circumstances we expect that write
8  * traffic will occur mostly to the latest page (and to the just-prior
9  * page, soon after a page transition).  Read traffic will probably touch
10  * a larger span of pages, but in any case a fairly small number of page
11  * buffers should be sufficient.  So, we just search the buffers using plain
12  * linear search; there's no need for a hashtable or anything fancy.
13  * The management algorithm is straight LRU except that we will never swap
14  * out the latest page (since we know it's going to be hit again eventually).
15  *
16  * We use a control LWLock to protect the shared data structures, plus
17  * per-buffer LWLocks that synchronize I/O for each buffer.  The control lock
18  * must be held to examine or modify any shared state.  A process that is
19  * reading in or writing out a page buffer does not hold the control lock,
20  * only the per-buffer lock for the buffer it is working on.
21  *
22  * "Holding the control lock" means exclusive lock in all cases except for
23  * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
24  * the implications of that.
25  *
26  * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
27  * before releasing the control lock.  The per-buffer lock is released after
28  * completing the I/O, re-acquiring the control lock, and updating the shared
29  * state.  (Deadlock is not possible here, because we never try to initiate
30  * I/O when someone else is already doing I/O on the same buffer.)
31  * To wait for I/O to complete, release the control lock, acquire the
32  * per-buffer lock in shared mode, immediately release the per-buffer lock,
33  * reacquire the control lock, and then recheck state (since arbitrary things
34  * could have happened while we didn't have the lock).
35  *
36  * As with the regular buffer manager, it is possible for another process
37  * to re-dirty a page that is currently being written out.  This is handled
38  * by re-setting the page's page_dirty flag.
39  *
40  *
41  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
42  * Portions Copyright (c) 1994, Regents of the University of California
43  *
44  * src/backend/access/transam/slru.c
45  *
46  *-------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <fcntl.h>
51 #include <sys/stat.h>
52 #include <unistd.h>
53
54 #include "access/slru.h"
55 #include "access/transam.h"
56 #include "access/xlog.h"
57 #include "pgstat.h"
58 #include "storage/fd.h"
59 #include "storage/shmem.h"
60 #include "miscadmin.h"
61
62
63 #define SlruFileName(ctl, path, seg) \
64         snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
65
66 /*
67  * During SimpleLruFlush(), we will usually not need to write/fsync more
68  * than one or two physical files, but we may need to write several pages
69  * per file.  We can consolidate the I/O requests by leaving files open
70  * until control returns to SimpleLruFlush().  This data structure remembers
71  * which files are open.
72  */
73 #define MAX_FLUSH_BUFFERS       16
74
75 typedef struct SlruFlushData
76 {
77         int                     num_files;              /* # files actually open */
78         int                     fd[MAX_FLUSH_BUFFERS];  /* their FD's */
79         int                     segno[MAX_FLUSH_BUFFERS];       /* their log seg#s */
80 } SlruFlushData;
81
82 typedef struct SlruFlushData *SlruFlush;
83
84 /*
85  * Macro to mark a buffer slot "most recently used".  Note multiple evaluation
86  * of arguments!
87  *
88  * The reason for the if-test is that there are often many consecutive
89  * accesses to the same page (particularly the latest page).  By suppressing
90  * useless increments of cur_lru_count, we reduce the probability that old
91  * pages' counts will "wrap around" and make them appear recently used.
92  *
93  * We allow this code to be executed concurrently by multiple processes within
94  * SimpleLruReadPage_ReadOnly().  As long as int reads and writes are atomic,
95  * this should not cause any completely-bogus values to enter the computation.
96  * However, it is possible for either cur_lru_count or individual
97  * page_lru_count entries to be "reset" to lower values than they should have,
98  * in case a process is delayed while it executes this macro.  With care in
99  * SlruSelectLRUPage(), this does little harm, and in any case the absolute
100  * worst possible consequence is a nonoptimal choice of page to evict.  The
101  * gain from allowing concurrent reads of SLRU pages seems worth it.
102  */
103 #define SlruRecentlyUsed(shared, slotno)        \
104         do { \
105                 int             new_lru_count = (shared)->cur_lru_count; \
106                 if (new_lru_count != (shared)->page_lru_count[slotno]) { \
107                         (shared)->cur_lru_count = ++new_lru_count; \
108                         (shared)->page_lru_count[slotno] = new_lru_count; \
109                 } \
110         } while (0)
111
112 /* Saved info for SlruReportIOError */
113 typedef enum
114 {
115         SLRU_OPEN_FAILED,
116         SLRU_SEEK_FAILED,
117         SLRU_READ_FAILED,
118         SLRU_WRITE_FAILED,
119         SLRU_FSYNC_FAILED,
120         SLRU_CLOSE_FAILED
121 } SlruErrorCause;
122
123 static SlruErrorCause slru_errcause;
124 static int      slru_errno;
125
126
127 static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
128 static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
129 static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
130 static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
131 static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
132                                                                   SlruFlush fdata);
133 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
134 static int      SlruSelectLRUPage(SlruCtl ctl, int pageno);
135
136 static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
137                                                                           int segpage, void *data);
138 static void SlruInternalDeleteSegment(SlruCtl ctl, char *filename);
139
140 /*
141  * Initialization of shared memory
142  */
143
144 Size
145 SimpleLruShmemSize(int nslots, int nlsns)
146 {
147         Size            sz;
148
149         /* we assume nslots isn't so large as to risk overflow */
150         sz = MAXALIGN(sizeof(SlruSharedData));
151         sz += MAXALIGN(nslots * sizeof(char *));        /* page_buffer[] */
152         sz += MAXALIGN(nslots * sizeof(SlruPageStatus));        /* page_status[] */
153         sz += MAXALIGN(nslots * sizeof(bool));  /* page_dirty[] */
154         sz += MAXALIGN(nslots * sizeof(int));   /* page_number[] */
155         sz += MAXALIGN(nslots * sizeof(int));   /* page_lru_count[] */
156         sz += MAXALIGN(nslots * sizeof(LWLockPadded));  /* buffer_locks[] */
157
158         if (nlsns > 0)
159                 sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));    /* group_lsn[] */
160
161         return BUFFERALIGN(sz) + BLCKSZ * nslots;
162 }
163
164 void
165 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
166                           LWLock *ctllock, const char *subdir, int tranche_id)
167 {
168         SlruShared      shared;
169         bool            found;
170
171         shared = (SlruShared) ShmemInitStruct(name,
172                                                                                   SimpleLruShmemSize(nslots, nlsns),
173                                                                                   &found);
174
175         if (!IsUnderPostmaster)
176         {
177                 /* Initialize locks and shared memory area */
178                 char       *ptr;
179                 Size            offset;
180                 int                     slotno;
181
182                 Assert(!found);
183
184                 memset(shared, 0, sizeof(SlruSharedData));
185
186                 shared->ControlLock = ctllock;
187
188                 shared->num_slots = nslots;
189                 shared->lsn_groups_per_page = nlsns;
190
191                 shared->cur_lru_count = 0;
192
193                 /* shared->latest_page_number will be set later */
194
195                 ptr = (char *) shared;
196                 offset = MAXALIGN(sizeof(SlruSharedData));
197                 shared->page_buffer = (char **) (ptr + offset);
198                 offset += MAXALIGN(nslots * sizeof(char *));
199                 shared->page_status = (SlruPageStatus *) (ptr + offset);
200                 offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
201                 shared->page_dirty = (bool *) (ptr + offset);
202                 offset += MAXALIGN(nslots * sizeof(bool));
203                 shared->page_number = (int *) (ptr + offset);
204                 offset += MAXALIGN(nslots * sizeof(int));
205                 shared->page_lru_count = (int *) (ptr + offset);
206                 offset += MAXALIGN(nslots * sizeof(int));
207
208                 /* Initialize LWLocks */
209                 shared->buffer_locks = (LWLockPadded *) (ptr + offset);
210                 offset += MAXALIGN(nslots * sizeof(LWLockPadded));
211
212                 if (nlsns > 0)
213                 {
214                         shared->group_lsn = (XLogRecPtr *) (ptr + offset);
215                         offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
216                 }
217
218                 Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
219                 strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
220                 shared->lwlock_tranche_id = tranche_id;
221
222                 ptr += BUFFERALIGN(offset);
223                 for (slotno = 0; slotno < nslots; slotno++)
224                 {
225                         LWLockInitialize(&shared->buffer_locks[slotno].lock,
226                                                          shared->lwlock_tranche_id);
227
228                         shared->page_buffer[slotno] = ptr;
229                         shared->page_status[slotno] = SLRU_PAGE_EMPTY;
230                         shared->page_dirty[slotno] = false;
231                         shared->page_lru_count[slotno] = 0;
232                         ptr += BLCKSZ;
233                 }
234
235                 /* Should fit to estimated shmem size */
236                 Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
237         }
238         else
239                 Assert(found);
240
241         /* Register SLRU tranche in the main tranches array */
242         LWLockRegisterTranche(shared->lwlock_tranche_id,
243                                                   shared->lwlock_tranche_name);
244
245         /*
246          * Initialize the unshared control struct, including directory path. We
247          * assume caller set PagePrecedes.
248          */
249         ctl->shared = shared;
250         ctl->do_fsync = true;           /* default behavior */
251         StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
252 }
253
254 /*
255  * Initialize (or reinitialize) a page to zeroes.
256  *
257  * The page is not actually written, just set up in shared memory.
258  * The slot number of the new page is returned.
259  *
260  * Control lock must be held at entry, and will be held at exit.
261  */
262 int
263 SimpleLruZeroPage(SlruCtl ctl, int pageno)
264 {
265         SlruShared      shared = ctl->shared;
266         int                     slotno;
267
268         /* Find a suitable buffer slot for the page */
269         slotno = SlruSelectLRUPage(ctl, pageno);
270         Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
271                    (shared->page_status[slotno] == SLRU_PAGE_VALID &&
272                         !shared->page_dirty[slotno]) ||
273                    shared->page_number[slotno] == pageno);
274
275         /* Mark the slot as containing this page */
276         shared->page_number[slotno] = pageno;
277         shared->page_status[slotno] = SLRU_PAGE_VALID;
278         shared->page_dirty[slotno] = true;
279         SlruRecentlyUsed(shared, slotno);
280
281         /* Set the buffer to zeroes */
282         MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
283
284         /* Set the LSNs for this new page to zero */
285         SimpleLruZeroLSNs(ctl, slotno);
286
287         /* Assume this page is now the latest active page */
288         shared->latest_page_number = pageno;
289
290         return slotno;
291 }
292
293 /*
294  * Zero all the LSNs we store for this slru page.
295  *
296  * This should be called each time we create a new page, and each time we read
297  * in a page from disk into an existing buffer.  (Such an old page cannot
298  * have any interesting LSNs, since we'd have flushed them before writing
299  * the page in the first place.)
300  *
301  * This assumes that InvalidXLogRecPtr is bitwise-all-0.
302  */
303 static void
304 SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
305 {
306         SlruShared      shared = ctl->shared;
307
308         if (shared->lsn_groups_per_page > 0)
309                 MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
310                            shared->lsn_groups_per_page * sizeof(XLogRecPtr));
311 }
312
313 /*
314  * Wait for any active I/O on a page slot to finish.  (This does not
315  * guarantee that new I/O hasn't been started before we return, though.
316  * In fact the slot might not even contain the same page anymore.)
317  *
318  * Control lock must be held at entry, and will be held at exit.
319  */
320 static void
321 SimpleLruWaitIO(SlruCtl ctl, int slotno)
322 {
323         SlruShared      shared = ctl->shared;
324
325         /* See notes at top of file */
326         LWLockRelease(shared->ControlLock);
327         LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
328         LWLockRelease(&shared->buffer_locks[slotno].lock);
329         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
330
331         /*
332          * If the slot is still in an io-in-progress state, then either someone
333          * already started a new I/O on the slot, or a previous I/O failed and
334          * neglected to reset the page state.  That shouldn't happen, really, but
335          * it seems worth a few extra cycles to check and recover from it. We can
336          * cheaply test for failure by seeing if the buffer lock is still held (we
337          * assume that transaction abort would release the lock).
338          */
339         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
340                 shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
341         {
342                 if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
343                 {
344                         /* indeed, the I/O must have failed */
345                         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
346                                 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
347                         else                            /* write_in_progress */
348                         {
349                                 shared->page_status[slotno] = SLRU_PAGE_VALID;
350                                 shared->page_dirty[slotno] = true;
351                         }
352                         LWLockRelease(&shared->buffer_locks[slotno].lock);
353                 }
354         }
355 }
356
357 /*
358  * Find a page in a shared buffer, reading it in if necessary.
359  * The page number must correspond to an already-initialized page.
360  *
361  * If write_ok is true then it is OK to return a page that is in
362  * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
363  * that modification of the page is safe.  If write_ok is false then we
364  * will not return the page until it is not undergoing active I/O.
365  *
366  * The passed-in xid is used only for error reporting, and may be
367  * InvalidTransactionId if no specific xid is associated with the action.
368  *
369  * Return value is the shared-buffer slot number now holding the page.
370  * The buffer's LRU access info is updated.
371  *
372  * Control lock must be held at entry, and will be held at exit.
373  */
374 int
375 SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
376                                   TransactionId xid)
377 {
378         SlruShared      shared = ctl->shared;
379
380         /* Outer loop handles restart if we must wait for someone else's I/O */
381         for (;;)
382         {
383                 int                     slotno;
384                 bool            ok;
385
386                 /* See if page already is in memory; if not, pick victim slot */
387                 slotno = SlruSelectLRUPage(ctl, pageno);
388
389                 /* Did we find the page in memory? */
390                 if (shared->page_number[slotno] == pageno &&
391                         shared->page_status[slotno] != SLRU_PAGE_EMPTY)
392                 {
393                         /*
394                          * If page is still being read in, we must wait for I/O.  Likewise
395                          * if the page is being written and the caller said that's not OK.
396                          */
397                         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
398                                 (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
399                                  !write_ok))
400                         {
401                                 SimpleLruWaitIO(ctl, slotno);
402                                 /* Now we must recheck state from the top */
403                                 continue;
404                         }
405                         /* Otherwise, it's ready to use */
406                         SlruRecentlyUsed(shared, slotno);
407                         return slotno;
408                 }
409
410                 /* We found no match; assert we selected a freeable slot */
411                 Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
412                            (shared->page_status[slotno] == SLRU_PAGE_VALID &&
413                                 !shared->page_dirty[slotno]));
414
415                 /* Mark the slot read-busy */
416                 shared->page_number[slotno] = pageno;
417                 shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
418                 shared->page_dirty[slotno] = false;
419
420                 /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
421                 LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
422
423                 /* Release control lock while doing I/O */
424                 LWLockRelease(shared->ControlLock);
425
426                 /* Do the read */
427                 ok = SlruPhysicalReadPage(ctl, pageno, slotno);
428
429                 /* Set the LSNs for this newly read-in page to zero */
430                 SimpleLruZeroLSNs(ctl, slotno);
431
432                 /* Re-acquire control lock and update page state */
433                 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
434
435                 Assert(shared->page_number[slotno] == pageno &&
436                            shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
437                            !shared->page_dirty[slotno]);
438
439                 shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
440
441                 LWLockRelease(&shared->buffer_locks[slotno].lock);
442
443                 /* Now it's okay to ereport if we failed */
444                 if (!ok)
445                         SlruReportIOError(ctl, pageno, xid);
446
447                 SlruRecentlyUsed(shared, slotno);
448                 return slotno;
449         }
450 }
451
452 /*
453  * Find a page in a shared buffer, reading it in if necessary.
454  * The page number must correspond to an already-initialized page.
455  * The caller must intend only read-only access to the page.
456  *
457  * The passed-in xid is used only for error reporting, and may be
458  * InvalidTransactionId if no specific xid is associated with the action.
459  *
460  * Return value is the shared-buffer slot number now holding the page.
461  * The buffer's LRU access info is updated.
462  *
463  * Control lock must NOT be held at entry, but will be held at exit.
464  * It is unspecified whether the lock will be shared or exclusive.
465  */
466 int
467 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
468 {
469         SlruShared      shared = ctl->shared;
470         int                     slotno;
471
472         /* Try to find the page while holding only shared lock */
473         LWLockAcquire(shared->ControlLock, LW_SHARED);
474
475         /* See if page is already in a buffer */
476         for (slotno = 0; slotno < shared->num_slots; slotno++)
477         {
478                 if (shared->page_number[slotno] == pageno &&
479                         shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
480                         shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
481                 {
482                         /* See comments for SlruRecentlyUsed macro */
483                         SlruRecentlyUsed(shared, slotno);
484                         return slotno;
485                 }
486         }
487
488         /* No luck, so switch to normal exclusive lock and do regular read */
489         LWLockRelease(shared->ControlLock);
490         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
491
492         return SimpleLruReadPage(ctl, pageno, true, xid);
493 }
494
495 /*
496  * Write a page from a shared buffer, if necessary.
497  * Does nothing if the specified slot is not dirty.
498  *
499  * NOTE: only one write attempt is made here.  Hence, it is possible that
500  * the page is still dirty at exit (if someone else re-dirtied it during
501  * the write).  However, we *do* attempt a fresh write even if the page
502  * is already being written; this is for checkpoints.
503  *
504  * Control lock must be held at entry, and will be held at exit.
505  */
506 static void
507 SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
508 {
509         SlruShared      shared = ctl->shared;
510         int                     pageno = shared->page_number[slotno];
511         bool            ok;
512
513         /* If a write is in progress, wait for it to finish */
514         while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
515                    shared->page_number[slotno] == pageno)
516         {
517                 SimpleLruWaitIO(ctl, slotno);
518         }
519
520         /*
521          * Do nothing if page is not dirty, or if buffer no longer contains the
522          * same page we were called for.
523          */
524         if (!shared->page_dirty[slotno] ||
525                 shared->page_status[slotno] != SLRU_PAGE_VALID ||
526                 shared->page_number[slotno] != pageno)
527                 return;
528
529         /*
530          * Mark the slot write-busy, and clear the dirtybit.  After this point, a
531          * transaction status update on this page will mark it dirty again.
532          */
533         shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
534         shared->page_dirty[slotno] = false;
535
536         /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
537         LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
538
539         /* Release control lock while doing I/O */
540         LWLockRelease(shared->ControlLock);
541
542         /* Do the write */
543         ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
544
545         /* If we failed, and we're in a flush, better close the files */
546         if (!ok && fdata)
547         {
548                 int                     i;
549
550                 for (i = 0; i < fdata->num_files; i++)
551                         CloseTransientFile(fdata->fd[i]);
552         }
553
554         /* Re-acquire control lock and update page state */
555         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
556
557         Assert(shared->page_number[slotno] == pageno &&
558                    shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
559
560         /* If we failed to write, mark the page dirty again */
561         if (!ok)
562                 shared->page_dirty[slotno] = true;
563
564         shared->page_status[slotno] = SLRU_PAGE_VALID;
565
566         LWLockRelease(&shared->buffer_locks[slotno].lock);
567
568         /* Now it's okay to ereport if we failed */
569         if (!ok)
570                 SlruReportIOError(ctl, pageno, InvalidTransactionId);
571 }
572
573 /*
574  * Wrapper of SlruInternalWritePage, for external callers.
575  * fdata is always passed a NULL here.
576  */
577 void
578 SimpleLruWritePage(SlruCtl ctl, int slotno)
579 {
580         SlruInternalWritePage(ctl, slotno, NULL);
581 }
582
583 /*
584  * Return whether the given page exists on disk.
585  *
586  * A false return means that either the file does not exist, or that it's not
587  * large enough to contain the given page.
588  */
589 bool
590 SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
591 {
592         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
593         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
594         int                     offset = rpageno * BLCKSZ;
595         char            path[MAXPGPATH];
596         int                     fd;
597         bool            result;
598         off_t           endpos;
599
600         SlruFileName(ctl, path, segno);
601
602         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
603         if (fd < 0)
604         {
605                 /* expected: file doesn't exist */
606                 if (errno == ENOENT)
607                         return false;
608
609                 /* report error normally */
610                 slru_errcause = SLRU_OPEN_FAILED;
611                 slru_errno = errno;
612                 SlruReportIOError(ctl, pageno, 0);
613         }
614
615         if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
616         {
617                 slru_errcause = SLRU_SEEK_FAILED;
618                 slru_errno = errno;
619                 SlruReportIOError(ctl, pageno, 0);
620         }
621
622         result = endpos >= (off_t) (offset + BLCKSZ);
623
624         if (CloseTransientFile(fd) != 0)
625         {
626                 slru_errcause = SLRU_CLOSE_FAILED;
627                 slru_errno = errno;
628                 return false;
629         }
630
631         return result;
632 }
633
634 /*
635  * Physical read of a (previously existing) page into a buffer slot
636  *
637  * On failure, we cannot just ereport(ERROR) since caller has put state in
638  * shared memory that must be undone.  So, we return false and save enough
639  * info in static variables to let SlruReportIOError make the report.
640  *
641  * For now, assume it's not worth keeping a file pointer open across
642  * read/write operations.  We could cache one virtual file pointer ...
643  */
644 static bool
645 SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
646 {
647         SlruShared      shared = ctl->shared;
648         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
649         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
650         int                     offset = rpageno * BLCKSZ;
651         char            path[MAXPGPATH];
652         int                     fd;
653
654         SlruFileName(ctl, path, segno);
655
656         /*
657          * In a crash-and-restart situation, it's possible for us to receive
658          * commands to set the commit status of transactions whose bits are in
659          * already-truncated segments of the commit log (see notes in
660          * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
661          * where the file doesn't exist, and return zeroes instead.
662          */
663         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
664         if (fd < 0)
665         {
666                 if (errno != ENOENT || !InRecovery)
667                 {
668                         slru_errcause = SLRU_OPEN_FAILED;
669                         slru_errno = errno;
670                         return false;
671                 }
672
673                 ereport(LOG,
674                                 (errmsg("file \"%s\" doesn't exist, reading as zeroes",
675                                                 path)));
676                 MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
677                 return true;
678         }
679
680         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
681         {
682                 slru_errcause = SLRU_SEEK_FAILED;
683                 slru_errno = errno;
684                 CloseTransientFile(fd);
685                 return false;
686         }
687
688         errno = 0;
689         pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);
690         if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
691         {
692                 pgstat_report_wait_end();
693                 slru_errcause = SLRU_READ_FAILED;
694                 slru_errno = errno;
695                 CloseTransientFile(fd);
696                 return false;
697         }
698         pgstat_report_wait_end();
699
700         if (CloseTransientFile(fd) != 0)
701         {
702                 slru_errcause = SLRU_CLOSE_FAILED;
703                 slru_errno = errno;
704                 return false;
705         }
706
707         return true;
708 }
709
710 /*
711  * Physical write of a page from a buffer slot
712  *
713  * On failure, we cannot just ereport(ERROR) since caller has put state in
714  * shared memory that must be undone.  So, we return false and save enough
715  * info in static variables to let SlruReportIOError make the report.
716  *
717  * For now, assume it's not worth keeping a file pointer open across
718  * independent read/write operations.  We do batch operations during
719  * SimpleLruFlush, though.
720  *
721  * fdata is NULL for a standalone write, pointer to open-file info during
722  * SimpleLruFlush.
723  */
724 static bool
725 SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
726 {
727         SlruShared      shared = ctl->shared;
728         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
729         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
730         int                     offset = rpageno * BLCKSZ;
731         char            path[MAXPGPATH];
732         int                     fd = -1;
733
734         /*
735          * Honor the write-WAL-before-data rule, if appropriate, so that we do not
736          * write out data before associated WAL records.  This is the same action
737          * performed during FlushBuffer() in the main buffer manager.
738          */
739         if (shared->group_lsn != NULL)
740         {
741                 /*
742                  * We must determine the largest async-commit LSN for the page. This
743                  * is a bit tedious, but since this entire function is a slow path
744                  * anyway, it seems better to do this here than to maintain a per-page
745                  * LSN variable (which'd need an extra comparison in the
746                  * transaction-commit path).
747                  */
748                 XLogRecPtr      max_lsn;
749                 int                     lsnindex,
750                                         lsnoff;
751
752                 lsnindex = slotno * shared->lsn_groups_per_page;
753                 max_lsn = shared->group_lsn[lsnindex++];
754                 for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
755                 {
756                         XLogRecPtr      this_lsn = shared->group_lsn[lsnindex++];
757
758                         if (max_lsn < this_lsn)
759                                 max_lsn = this_lsn;
760                 }
761
762                 if (!XLogRecPtrIsInvalid(max_lsn))
763                 {
764                         /*
765                          * As noted above, elog(ERROR) is not acceptable here, so if
766                          * XLogFlush were to fail, we must PANIC.  This isn't much of a
767                          * restriction because XLogFlush is just about all critical
768                          * section anyway, but let's make sure.
769                          */
770                         START_CRIT_SECTION();
771                         XLogFlush(max_lsn);
772                         END_CRIT_SECTION();
773                 }
774         }
775
776         /*
777          * During a Flush, we may already have the desired file open.
778          */
779         if (fdata)
780         {
781                 int                     i;
782
783                 for (i = 0; i < fdata->num_files; i++)
784                 {
785                         if (fdata->segno[i] == segno)
786                         {
787                                 fd = fdata->fd[i];
788                                 break;
789                         }
790                 }
791         }
792
793         if (fd < 0)
794         {
795                 /*
796                  * If the file doesn't already exist, we should create it.  It is
797                  * possible for this to need to happen when writing a page that's not
798                  * first in its segment; we assume the OS can cope with that. (Note:
799                  * it might seem that it'd be okay to create files only when
800                  * SimpleLruZeroPage is called for the first page of a segment.
801                  * However, if after a crash and restart the REDO logic elects to
802                  * replay the log from a checkpoint before the latest one, then it's
803                  * possible that we will get commands to set transaction status of
804                  * transactions that have already been truncated from the commit log.
805                  * Easiest way to deal with that is to accept references to
806                  * nonexistent files here and in SlruPhysicalReadPage.)
807                  *
808                  * Note: it is possible for more than one backend to be executing this
809                  * code simultaneously for different pages of the same file. Hence,
810                  * don't use O_EXCL or O_TRUNC or anything like that.
811                  */
812                 SlruFileName(ctl, path, segno);
813                 fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY);
814                 if (fd < 0)
815                 {
816                         slru_errcause = SLRU_OPEN_FAILED;
817                         slru_errno = errno;
818                         return false;
819                 }
820
821                 if (fdata)
822                 {
823                         if (fdata->num_files < MAX_FLUSH_BUFFERS)
824                         {
825                                 fdata->fd[fdata->num_files] = fd;
826                                 fdata->segno[fdata->num_files] = segno;
827                                 fdata->num_files++;
828                         }
829                         else
830                         {
831                                 /*
832                                  * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
833                                  * fall back to treating it as a standalone write.
834                                  */
835                                 fdata = NULL;
836                         }
837                 }
838         }
839
840         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
841         {
842                 slru_errcause = SLRU_SEEK_FAILED;
843                 slru_errno = errno;
844                 if (!fdata)
845                         CloseTransientFile(fd);
846                 return false;
847         }
848
849         errno = 0;
850         pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
851         if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
852         {
853                 pgstat_report_wait_end();
854                 /* if write didn't set errno, assume problem is no disk space */
855                 if (errno == 0)
856                         errno = ENOSPC;
857                 slru_errcause = SLRU_WRITE_FAILED;
858                 slru_errno = errno;
859                 if (!fdata)
860                         CloseTransientFile(fd);
861                 return false;
862         }
863         pgstat_report_wait_end();
864
865         /*
866          * If not part of Flush, need to fsync now.  We assume this happens
867          * infrequently enough that it's not a performance issue.
868          */
869         if (!fdata)
870         {
871                 pgstat_report_wait_start(WAIT_EVENT_SLRU_SYNC);
872                 if (ctl->do_fsync && pg_fsync(fd) != 0)
873                 {
874                         pgstat_report_wait_end();
875                         slru_errcause = SLRU_FSYNC_FAILED;
876                         slru_errno = errno;
877                         CloseTransientFile(fd);
878                         return false;
879                 }
880                 pgstat_report_wait_end();
881
882                 if (CloseTransientFile(fd) != 0)
883                 {
884                         slru_errcause = SLRU_CLOSE_FAILED;
885                         slru_errno = errno;
886                         return false;
887                 }
888         }
889
890         return true;
891 }
892
893 /*
894  * Issue the error message after failure of SlruPhysicalReadPage or
895  * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
896  */
897 static void
898 SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
899 {
900         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
901         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
902         int                     offset = rpageno * BLCKSZ;
903         char            path[MAXPGPATH];
904
905         SlruFileName(ctl, path, segno);
906         errno = slru_errno;
907         switch (slru_errcause)
908         {
909                 case SLRU_OPEN_FAILED:
910                         ereport(ERROR,
911                                         (errcode_for_file_access(),
912                                          errmsg("could not access status of transaction %u", xid),
913                                          errdetail("Could not open file \"%s\": %m.", path)));
914                         break;
915                 case SLRU_SEEK_FAILED:
916                         ereport(ERROR,
917                                         (errcode_for_file_access(),
918                                          errmsg("could not access status of transaction %u", xid),
919                                          errdetail("Could not seek in file \"%s\" to offset %u: %m.",
920                                                            path, offset)));
921                         break;
922                 case SLRU_READ_FAILED:
923                         if (errno)
924                                 ereport(ERROR,
925                                                 (errcode_for_file_access(),
926                                                  errmsg("could not access status of transaction %u", xid),
927                                                  errdetail("Could not read from file \"%s\" at offset %u: %m.",
928                                                                    path, offset)));
929                         else
930                                 ereport(ERROR,
931                                                 (errmsg("could not access status of transaction %u", xid),
932                                                  errdetail("Could not read from file \"%s\" at offset %u: read too few bytes.", path, offset)));
933                         break;
934                 case SLRU_WRITE_FAILED:
935                         if (errno)
936                                 ereport(ERROR,
937                                                 (errcode_for_file_access(),
938                                                  errmsg("could not access status of transaction %u", xid),
939                                                  errdetail("Could not write to file \"%s\" at offset %u: %m.",
940                                                                    path, offset)));
941                         else
942                                 ereport(ERROR,
943                                                 (errmsg("could not access status of transaction %u", xid),
944                                                  errdetail("Could not write to file \"%s\" at offset %u: wrote too few bytes.",
945                                                                    path, offset)));
946                         break;
947                 case SLRU_FSYNC_FAILED:
948                         ereport(data_sync_elevel(ERROR),
949                                         (errcode_for_file_access(),
950                                          errmsg("could not access status of transaction %u", xid),
951                                          errdetail("Could not fsync file \"%s\": %m.",
952                                                            path)));
953                         break;
954                 case SLRU_CLOSE_FAILED:
955                         ereport(ERROR,
956                                         (errcode_for_file_access(),
957                                          errmsg("could not access status of transaction %u", xid),
958                                          errdetail("Could not close file \"%s\": %m.",
959                                                            path)));
960                         break;
961                 default:
962                         /* can't get here, we trust */
963                         elog(ERROR, "unrecognized SimpleLru error cause: %d",
964                                  (int) slru_errcause);
965                         break;
966         }
967 }
968
969 /*
970  * Select the slot to re-use when we need a free slot.
971  *
972  * The target page number is passed because we need to consider the
973  * possibility that some other process reads in the target page while
974  * we are doing I/O to free a slot.  Hence, check or recheck to see if
975  * any slot already holds the target page, and return that slot if so.
976  * Thus, the returned slot is *either* a slot already holding the pageno
977  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
978  * or CLEAN).
979  *
980  * Control lock must be held at entry, and will be held at exit.
981  */
982 static int
983 SlruSelectLRUPage(SlruCtl ctl, int pageno)
984 {
985         SlruShared      shared = ctl->shared;
986
987         /* Outer loop handles restart after I/O */
988         for (;;)
989         {
990                 int                     slotno;
991                 int                     cur_count;
992                 int                     bestvalidslot = 0;      /* keep compiler quiet */
993                 int                     best_valid_delta = -1;
994                 int                     best_valid_page_number = 0; /* keep compiler quiet */
995                 int                     bestinvalidslot = 0;    /* keep compiler quiet */
996                 int                     best_invalid_delta = -1;
997                 int                     best_invalid_page_number = 0;   /* keep compiler quiet */
998
999                 /* See if page already has a buffer assigned */
1000                 for (slotno = 0; slotno < shared->num_slots; slotno++)
1001                 {
1002                         if (shared->page_number[slotno] == pageno &&
1003                                 shared->page_status[slotno] != SLRU_PAGE_EMPTY)
1004                                 return slotno;
1005                 }
1006
1007                 /*
1008                  * If we find any EMPTY slot, just select that one. Else choose a
1009                  * victim page to replace.  We normally take the least recently used
1010                  * valid page, but we will never take the slot containing
1011                  * latest_page_number, even if it appears least recently used.  We
1012                  * will select a slot that is already I/O busy only if there is no
1013                  * other choice: a read-busy slot will not be least recently used once
1014                  * the read finishes, and waiting for an I/O on a write-busy slot is
1015                  * inferior to just picking some other slot.  Testing shows the slot
1016                  * we pick instead will often be clean, allowing us to begin a read at
1017                  * once.
1018                  *
1019                  * Normally the page_lru_count values will all be different and so
1020                  * there will be a well-defined LRU page.  But since we allow
1021                  * concurrent execution of SlruRecentlyUsed() within
1022                  * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
1023                  * acquire the same lru_count values.  In that case we break ties by
1024                  * choosing the furthest-back page.
1025                  *
1026                  * Notice that this next line forcibly advances cur_lru_count to a
1027                  * value that is certainly beyond any value that will be in the
1028                  * page_lru_count array after the loop finishes.  This ensures that
1029                  * the next execution of SlruRecentlyUsed will mark the page newly
1030                  * used, even if it's for a page that has the current counter value.
1031                  * That gets us back on the path to having good data when there are
1032                  * multiple pages with the same lru_count.
1033                  */
1034                 cur_count = (shared->cur_lru_count)++;
1035                 for (slotno = 0; slotno < shared->num_slots; slotno++)
1036                 {
1037                         int                     this_delta;
1038                         int                     this_page_number;
1039
1040                         if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1041                                 return slotno;
1042                         this_delta = cur_count - shared->page_lru_count[slotno];
1043                         if (this_delta < 0)
1044                         {
1045                                 /*
1046                                  * Clean up in case shared updates have caused cur_count
1047                                  * increments to get "lost".  We back off the page counts,
1048                                  * rather than trying to increase cur_count, to avoid any
1049                                  * question of infinite loops or failure in the presence of
1050                                  * wrapped-around counts.
1051                                  */
1052                                 shared->page_lru_count[slotno] = cur_count;
1053                                 this_delta = 0;
1054                         }
1055                         this_page_number = shared->page_number[slotno];
1056                         if (this_page_number == shared->latest_page_number)
1057                                 continue;
1058                         if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1059                         {
1060                                 if (this_delta > best_valid_delta ||
1061                                         (this_delta == best_valid_delta &&
1062                                          ctl->PagePrecedes(this_page_number,
1063                                                                            best_valid_page_number)))
1064                                 {
1065                                         bestvalidslot = slotno;
1066                                         best_valid_delta = this_delta;
1067                                         best_valid_page_number = this_page_number;
1068                                 }
1069                         }
1070                         else
1071                         {
1072                                 if (this_delta > best_invalid_delta ||
1073                                         (this_delta == best_invalid_delta &&
1074                                          ctl->PagePrecedes(this_page_number,
1075                                                                            best_invalid_page_number)))
1076                                 {
1077                                         bestinvalidslot = slotno;
1078                                         best_invalid_delta = this_delta;
1079                                         best_invalid_page_number = this_page_number;
1080                                 }
1081                         }
1082                 }
1083
1084                 /*
1085                  * If all pages (except possibly the latest one) are I/O busy, we'll
1086                  * have to wait for an I/O to complete and then retry.  In that
1087                  * unhappy case, we choose to wait for the I/O on the least recently
1088                  * used slot, on the assumption that it was likely initiated first of
1089                  * all the I/Os in progress and may therefore finish first.
1090                  */
1091                 if (best_valid_delta < 0)
1092                 {
1093                         SimpleLruWaitIO(ctl, bestinvalidslot);
1094                         continue;
1095                 }
1096
1097                 /*
1098                  * If the selected page is clean, we're set.
1099                  */
1100                 if (!shared->page_dirty[bestvalidslot])
1101                         return bestvalidslot;
1102
1103                 /*
1104                  * Write the page.
1105                  */
1106                 SlruInternalWritePage(ctl, bestvalidslot, NULL);
1107
1108                 /*
1109                  * Now loop back and try again.  This is the easiest way of dealing
1110                  * with corner cases such as the victim page being re-dirtied while we
1111                  * wrote it.
1112                  */
1113         }
1114 }
1115
1116 /*
1117  * Flush dirty pages to disk during checkpoint or database shutdown
1118  */
1119 void
1120 SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
1121 {
1122         SlruShared      shared = ctl->shared;
1123         SlruFlushData fdata;
1124         int                     slotno;
1125         int                     pageno = 0;
1126         int                     i;
1127         bool            ok;
1128
1129         /*
1130          * Find and write dirty pages
1131          */
1132         fdata.num_files = 0;
1133
1134         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1135
1136         for (slotno = 0; slotno < shared->num_slots; slotno++)
1137         {
1138                 SlruInternalWritePage(ctl, slotno, &fdata);
1139
1140                 /*
1141                  * In some places (e.g. checkpoints), we cannot assert that the slot
1142                  * is clean now, since another process might have re-dirtied it
1143                  * already.  That's okay.
1144                  */
1145                 Assert(allow_redirtied ||
1146                            shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1147                            (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1148                                 !shared->page_dirty[slotno]));
1149         }
1150
1151         LWLockRelease(shared->ControlLock);
1152
1153         /*
1154          * Now fsync and close any files that were open
1155          */
1156         ok = true;
1157         for (i = 0; i < fdata.num_files; i++)
1158         {
1159                 pgstat_report_wait_start(WAIT_EVENT_SLRU_FLUSH_SYNC);
1160                 if (ctl->do_fsync && pg_fsync(fdata.fd[i]) != 0)
1161                 {
1162                         slru_errcause = SLRU_FSYNC_FAILED;
1163                         slru_errno = errno;
1164                         pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1165                         ok = false;
1166                 }
1167                 pgstat_report_wait_end();
1168
1169                 if (CloseTransientFile(fdata.fd[i]) != 0)
1170                 {
1171                         slru_errcause = SLRU_CLOSE_FAILED;
1172                         slru_errno = errno;
1173                         pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1174                         ok = false;
1175                 }
1176         }
1177         if (!ok)
1178                 SlruReportIOError(ctl, pageno, InvalidTransactionId);
1179 }
1180
1181 /*
1182  * Remove all segments before the one holding the passed page number
1183  */
1184 void
1185 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
1186 {
1187         SlruShared      shared = ctl->shared;
1188         int                     slotno;
1189
1190         /*
1191          * The cutoff point is the start of the segment containing cutoffPage.
1192          */
1193         cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1194
1195         /*
1196          * Scan shared memory and remove any pages preceding the cutoff page, to
1197          * ensure we won't rewrite them later.  (Since this is normally called in
1198          * or just after a checkpoint, any dirty pages should have been flushed
1199          * already ... we're just being extra careful here.)
1200          */
1201         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1202
1203 restart:;
1204
1205         /*
1206          * While we are holding the lock, make an important safety check: the
1207          * planned cutoff point must be <= the current endpoint page. Otherwise we
1208          * have already wrapped around, and proceeding with the truncation would
1209          * risk removing the current segment.
1210          */
1211         if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
1212         {
1213                 LWLockRelease(shared->ControlLock);
1214                 ereport(LOG,
1215                                 (errmsg("could not truncate directory \"%s\": apparent wraparound",
1216                                                 ctl->Dir)));
1217                 return;
1218         }
1219
1220         for (slotno = 0; slotno < shared->num_slots; slotno++)
1221         {
1222                 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1223                         continue;
1224                 if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1225                         continue;
1226
1227                 /*
1228                  * If page is clean, just change state to EMPTY (expected case).
1229                  */
1230                 if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1231                         !shared->page_dirty[slotno])
1232                 {
1233                         shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1234                         continue;
1235                 }
1236
1237                 /*
1238                  * Hmm, we have (or may have) I/O operations acting on the page, so
1239                  * we've got to wait for them to finish and then start again. This is
1240                  * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
1241                  * wouldn't it be OK to just discard it without writing it?  For now,
1242                  * keep the logic the same as it was.)
1243                  */
1244                 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1245                         SlruInternalWritePage(ctl, slotno, NULL);
1246                 else
1247                         SimpleLruWaitIO(ctl, slotno);
1248                 goto restart;
1249         }
1250
1251         LWLockRelease(shared->ControlLock);
1252
1253         /* Now we can remove the old segment(s) */
1254         (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1255 }
1256
1257 /*
1258  * Delete an individual SLRU segment, identified by the filename.
1259  *
1260  * NB: This does not touch the SLRU buffers themselves, callers have to ensure
1261  * they either can't yet contain anything, or have already been cleaned out.
1262  */
1263 static void
1264 SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
1265 {
1266         char            path[MAXPGPATH];
1267
1268         snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
1269         ereport(DEBUG2,
1270                         (errmsg("removing file \"%s\"", path)));
1271         unlink(path);
1272 }
1273
1274 /*
1275  * Delete an individual SLRU segment, identified by the segment number.
1276  */
1277 void
1278 SlruDeleteSegment(SlruCtl ctl, int segno)
1279 {
1280         SlruShared      shared = ctl->shared;
1281         int                     slotno;
1282         char            path[MAXPGPATH];
1283         bool            did_write;
1284
1285         /* Clean out any possibly existing references to the segment. */
1286         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1287 restart:
1288         did_write = false;
1289         for (slotno = 0; slotno < shared->num_slots; slotno++)
1290         {
1291                 int                     pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
1292
1293                 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1294                         continue;
1295
1296                 /* not the segment we're looking for */
1297                 if (pagesegno != segno)
1298                         continue;
1299
1300                 /* If page is clean, just change state to EMPTY (expected case). */
1301                 if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1302                         !shared->page_dirty[slotno])
1303                 {
1304                         shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1305                         continue;
1306                 }
1307
1308                 /* Same logic as SimpleLruTruncate() */
1309                 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1310                         SlruInternalWritePage(ctl, slotno, NULL);
1311                 else
1312                         SimpleLruWaitIO(ctl, slotno);
1313
1314                 did_write = true;
1315         }
1316
1317         /*
1318          * Be extra careful and re-check. The IO functions release the control
1319          * lock, so new pages could have been read in.
1320          */
1321         if (did_write)
1322                 goto restart;
1323
1324         snprintf(path, MAXPGPATH, "%s/%04X", ctl->Dir, segno);
1325         ereport(DEBUG2,
1326                         (errmsg("removing file \"%s\"", path)));
1327         unlink(path);
1328
1329         LWLockRelease(shared->ControlLock);
1330 }
1331
1332 /*
1333  * SlruScanDirectory callback
1334  *              This callback reports true if there's any segment prior to the one
1335  *              containing the page passed as "data".
1336  */
1337 bool
1338 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
1339 {
1340         int                     cutoffPage = *(int *) data;
1341
1342         cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1343
1344         if (ctl->PagePrecedes(segpage, cutoffPage))
1345                 return true;                    /* found one; don't iterate any more */
1346
1347         return false;                           /* keep going */
1348 }
1349
1350 /*
1351  * SlruScanDirectory callback.
1352  *              This callback deletes segments prior to the one passed in as "data".
1353  */
1354 static bool
1355 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
1356 {
1357         int                     cutoffPage = *(int *) data;
1358
1359         if (ctl->PagePrecedes(segpage, cutoffPage))
1360                 SlruInternalDeleteSegment(ctl, filename);
1361
1362         return false;                           /* keep going */
1363 }
1364
1365 /*
1366  * SlruScanDirectory callback.
1367  *              This callback deletes all segments.
1368  */
1369 bool
1370 SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
1371 {
1372         SlruInternalDeleteSegment(ctl, filename);
1373
1374         return false;                           /* keep going */
1375 }
1376
1377 /*
1378  * Scan the SimpleLru directory and apply a callback to each file found in it.
1379  *
1380  * If the callback returns true, the scan is stopped.  The last return value
1381  * from the callback is returned.
1382  *
1383  * The callback receives the following arguments: 1. the SlruCtl struct for the
1384  * slru being truncated; 2. the filename being considered; 3. the page number
1385  * for the first page of that file; 4. a pointer to the opaque data given to us
1386  * by the caller.
1387  *
1388  * Note that the ordering in which the directory is scanned is not guaranteed.
1389  *
1390  * Note that no locking is applied.
1391  */
1392 bool
1393 SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1394 {
1395         bool            retval = false;
1396         DIR                *cldir;
1397         struct dirent *clde;
1398         int                     segno;
1399         int                     segpage;
1400
1401         cldir = AllocateDir(ctl->Dir);
1402         while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1403         {
1404                 size_t          len;
1405
1406                 len = strlen(clde->d_name);
1407
1408                 if ((len == 4 || len == 5 || len == 6) &&
1409                         strspn(clde->d_name, "0123456789ABCDEF") == len)
1410                 {
1411                         segno = (int) strtol(clde->d_name, NULL, 16);
1412                         segpage = segno * SLRU_PAGES_PER_SEGMENT;
1413
1414                         elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1415                                  ctl->Dir, clde->d_name);
1416                         retval = callback(ctl, clde->d_name, segpage, data);
1417                         if (retval)
1418                                 break;
1419                 }
1420         }
1421         FreeDir(cldir);
1422
1423         return retval;
1424 }