]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/slru.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / transam / slru.c
1 /*-------------------------------------------------------------------------
2  *
3  * slru.c
4  *              Simple LRU buffering for transaction status logfiles
5  *
6  * We use a simple least-recently-used scheme to manage a pool of page
7  * buffers.  Under ordinary circumstances we expect that write
8  * traffic will occur mostly to the latest page (and to the just-prior
9  * page, soon after a page transition).  Read traffic will probably touch
10  * a larger span of pages, but in any case a fairly small number of page
11  * buffers should be sufficient.  So, we just search the buffers using plain
12  * linear search; there's no need for a hashtable or anything fancy.
13  * The management algorithm is straight LRU except that we will never swap
14  * out the latest page (since we know it's going to be hit again eventually).
15  *
16  * We use a control LWLock to protect the shared data structures, plus
17  * per-buffer LWLocks that synchronize I/O for each buffer.  The control lock
18  * must be held to examine or modify any shared state.  A process that is
19  * reading in or writing out a page buffer does not hold the control lock,
20  * only the per-buffer lock for the buffer it is working on.
21  *
22  * "Holding the control lock" means exclusive lock in all cases except for
23  * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
24  * the implications of that.
25  *
26  * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
27  * before releasing the control lock.  The per-buffer lock is released after
28  * completing the I/O, re-acquiring the control lock, and updating the shared
29  * state.  (Deadlock is not possible here, because we never try to initiate
30  * I/O when someone else is already doing I/O on the same buffer.)
31  * To wait for I/O to complete, release the control lock, acquire the
32  * per-buffer lock in shared mode, immediately release the per-buffer lock,
33  * reacquire the control lock, and then recheck state (since arbitrary things
34  * could have happened while we didn't have the lock).
35  *
36  * As with the regular buffer manager, it is possible for another process
37  * to re-dirty a page that is currently being written out.  This is handled
38  * by re-setting the page's page_dirty flag.
39  *
40  *
41  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
42  * Portions Copyright (c) 1994, Regents of the University of California
43  *
44  * src/backend/access/transam/slru.c
45  *
46  *-------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49
50 #include <fcntl.h>
51 #include <sys/stat.h>
52 #include <unistd.h>
53
54 #include "access/slru.h"
55 #include "access/transam.h"
56 #include "access/xlog.h"
57 #include "storage/fd.h"
58 #include "storage/shmem.h"
59 #include "miscadmin.h"
60
61
62 #define SlruFileName(ctl, path, seg) \
63         snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
64
65 /*
66  * During SimpleLruFlush(), we will usually not need to write/fsync more
67  * than one or two physical files, but we may need to write several pages
68  * per file.  We can consolidate the I/O requests by leaving files open
69  * until control returns to SimpleLruFlush().  This data structure remembers
70  * which files are open.
71  */
72 #define MAX_FLUSH_BUFFERS       16
73
74 typedef struct SlruFlushData
75 {
76         int                     num_files;              /* # files actually open */
77         int                     fd[MAX_FLUSH_BUFFERS];  /* their FD's */
78         int                     segno[MAX_FLUSH_BUFFERS];               /* their log seg#s */
79 } SlruFlushData;
80
81 typedef struct SlruFlushData *SlruFlush;
82
83 /*
84  * Macro to mark a buffer slot "most recently used".  Note multiple evaluation
85  * of arguments!
86  *
87  * The reason for the if-test is that there are often many consecutive
88  * accesses to the same page (particularly the latest page).  By suppressing
89  * useless increments of cur_lru_count, we reduce the probability that old
90  * pages' counts will "wrap around" and make them appear recently used.
91  *
92  * We allow this code to be executed concurrently by multiple processes within
93  * SimpleLruReadPage_ReadOnly().  As long as int reads and writes are atomic,
94  * this should not cause any completely-bogus values to enter the computation.
95  * However, it is possible for either cur_lru_count or individual
96  * page_lru_count entries to be "reset" to lower values than they should have,
97  * in case a process is delayed while it executes this macro.  With care in
98  * SlruSelectLRUPage(), this does little harm, and in any case the absolute
99  * worst possible consequence is a nonoptimal choice of page to evict.  The
100  * gain from allowing concurrent reads of SLRU pages seems worth it.
101  */
102 #define SlruRecentlyUsed(shared, slotno)        \
103         do { \
104                 int             new_lru_count = (shared)->cur_lru_count; \
105                 if (new_lru_count != (shared)->page_lru_count[slotno]) { \
106                         (shared)->cur_lru_count = ++new_lru_count; \
107                         (shared)->page_lru_count[slotno] = new_lru_count; \
108                 } \
109         } while (0)
110
111 /* Saved info for SlruReportIOError */
112 typedef enum
113 {
114         SLRU_OPEN_FAILED,
115         SLRU_SEEK_FAILED,
116         SLRU_READ_FAILED,
117         SLRU_WRITE_FAILED,
118         SLRU_FSYNC_FAILED,
119         SLRU_CLOSE_FAILED
120 } SlruErrorCause;
121
122 static SlruErrorCause slru_errcause;
123 static int      slru_errno;
124
125
126 static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
127 static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
128 static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
129 static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
130 static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
131                                           SlruFlush fdata);
132 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
133 static int      SlruSelectLRUPage(SlruCtl ctl, int pageno);
134
135 static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
136                                                   int segpage, void *data);
137
138 /*
139  * Initialization of shared memory
140  */
141
142 Size
143 SimpleLruShmemSize(int nslots, int nlsns)
144 {
145         Size            sz;
146
147         /* we assume nslots isn't so large as to risk overflow */
148         sz = MAXALIGN(sizeof(SlruSharedData));
149         sz += MAXALIGN(nslots * sizeof(char *));        /* page_buffer[] */
150         sz += MAXALIGN(nslots * sizeof(SlruPageStatus));        /* page_status[] */
151         sz += MAXALIGN(nslots * sizeof(bool));          /* page_dirty[] */
152         sz += MAXALIGN(nslots * sizeof(int));           /* page_number[] */
153         sz += MAXALIGN(nslots * sizeof(int));           /* page_lru_count[] */
154         sz += MAXALIGN(nslots * sizeof(LWLock *));      /* buffer_locks[] */
155
156         if (nlsns > 0)
157                 sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));    /* group_lsn[] */
158
159         return BUFFERALIGN(sz) + BLCKSZ * nslots;
160 }
161
162 void
163 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
164                           LWLock *ctllock, const char *subdir)
165 {
166         SlruShared      shared;
167         bool            found;
168
169         shared = (SlruShared) ShmemInitStruct(name,
170                                                                                   SimpleLruShmemSize(nslots, nlsns),
171                                                                                   &found);
172
173         if (!IsUnderPostmaster)
174         {
175                 /* Initialize locks and shared memory area */
176                 char       *ptr;
177                 Size            offset;
178                 int                     slotno;
179
180                 Assert(!found);
181
182                 memset(shared, 0, sizeof(SlruSharedData));
183
184                 shared->ControlLock = ctllock;
185
186                 shared->num_slots = nslots;
187                 shared->lsn_groups_per_page = nlsns;
188
189                 shared->cur_lru_count = 0;
190
191                 /* shared->latest_page_number will be set later */
192
193                 ptr = (char *) shared;
194                 offset = MAXALIGN(sizeof(SlruSharedData));
195                 shared->page_buffer = (char **) (ptr + offset);
196                 offset += MAXALIGN(nslots * sizeof(char *));
197                 shared->page_status = (SlruPageStatus *) (ptr + offset);
198                 offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
199                 shared->page_dirty = (bool *) (ptr + offset);
200                 offset += MAXALIGN(nslots * sizeof(bool));
201                 shared->page_number = (int *) (ptr + offset);
202                 offset += MAXALIGN(nslots * sizeof(int));
203                 shared->page_lru_count = (int *) (ptr + offset);
204                 offset += MAXALIGN(nslots * sizeof(int));
205                 shared->buffer_locks = (LWLock **) (ptr + offset);
206                 offset += MAXALIGN(nslots * sizeof(LWLock *));
207
208                 if (nlsns > 0)
209                 {
210                         shared->group_lsn = (XLogRecPtr *) (ptr + offset);
211                         offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
212                 }
213
214                 ptr += BUFFERALIGN(offset);
215                 for (slotno = 0; slotno < nslots; slotno++)
216                 {
217                         shared->page_buffer[slotno] = ptr;
218                         shared->page_status[slotno] = SLRU_PAGE_EMPTY;
219                         shared->page_dirty[slotno] = false;
220                         shared->page_lru_count[slotno] = 0;
221                         shared->buffer_locks[slotno] = LWLockAssign();
222                         ptr += BLCKSZ;
223                 }
224         }
225         else
226                 Assert(found);
227
228         /*
229          * Initialize the unshared control struct, including directory path. We
230          * assume caller set PagePrecedes.
231          */
232         ctl->shared = shared;
233         ctl->do_fsync = true;           /* default behavior */
234         StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
235 }
236
237 /*
238  * Initialize (or reinitialize) a page to zeroes.
239  *
240  * The page is not actually written, just set up in shared memory.
241  * The slot number of the new page is returned.
242  *
243  * Control lock must be held at entry, and will be held at exit.
244  */
245 int
246 SimpleLruZeroPage(SlruCtl ctl, int pageno)
247 {
248         SlruShared      shared = ctl->shared;
249         int                     slotno;
250
251         /* Find a suitable buffer slot for the page */
252         slotno = SlruSelectLRUPage(ctl, pageno);
253         Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
254                    (shared->page_status[slotno] == SLRU_PAGE_VALID &&
255                         !shared->page_dirty[slotno]) ||
256                    shared->page_number[slotno] == pageno);
257
258         /* Mark the slot as containing this page */
259         shared->page_number[slotno] = pageno;
260         shared->page_status[slotno] = SLRU_PAGE_VALID;
261         shared->page_dirty[slotno] = true;
262         SlruRecentlyUsed(shared, slotno);
263
264         /* Set the buffer to zeroes */
265         MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
266
267         /* Set the LSNs for this new page to zero */
268         SimpleLruZeroLSNs(ctl, slotno);
269
270         /* Assume this page is now the latest active page */
271         shared->latest_page_number = pageno;
272
273         return slotno;
274 }
275
276 /*
277  * Zero all the LSNs we store for this slru page.
278  *
279  * This should be called each time we create a new page, and each time we read
280  * in a page from disk into an existing buffer.  (Such an old page cannot
281  * have any interesting LSNs, since we'd have flushed them before writing
282  * the page in the first place.)
283  *
284  * This assumes that InvalidXLogRecPtr is bitwise-all-0.
285  */
286 static void
287 SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
288 {
289         SlruShared      shared = ctl->shared;
290
291         if (shared->lsn_groups_per_page > 0)
292                 MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
293                            shared->lsn_groups_per_page * sizeof(XLogRecPtr));
294 }
295
296 /*
297  * Wait for any active I/O on a page slot to finish.  (This does not
298  * guarantee that new I/O hasn't been started before we return, though.
299  * In fact the slot might not even contain the same page anymore.)
300  *
301  * Control lock must be held at entry, and will be held at exit.
302  */
303 static void
304 SimpleLruWaitIO(SlruCtl ctl, int slotno)
305 {
306         SlruShared      shared = ctl->shared;
307
308         /* See notes at top of file */
309         LWLockRelease(shared->ControlLock);
310         LWLockAcquire(shared->buffer_locks[slotno], LW_SHARED);
311         LWLockRelease(shared->buffer_locks[slotno]);
312         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
313
314         /*
315          * If the slot is still in an io-in-progress state, then either someone
316          * already started a new I/O on the slot, or a previous I/O failed and
317          * neglected to reset the page state.  That shouldn't happen, really, but
318          * it seems worth a few extra cycles to check and recover from it. We can
319          * cheaply test for failure by seeing if the buffer lock is still held (we
320          * assume that transaction abort would release the lock).
321          */
322         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
323                 shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
324         {
325                 if (LWLockConditionalAcquire(shared->buffer_locks[slotno], LW_SHARED))
326                 {
327                         /* indeed, the I/O must have failed */
328                         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
329                                 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
330                         else    /* write_in_progress */
331                         {
332                                 shared->page_status[slotno] = SLRU_PAGE_VALID;
333                                 shared->page_dirty[slotno] = true;
334                         }
335                         LWLockRelease(shared->buffer_locks[slotno]);
336                 }
337         }
338 }
339
340 /*
341  * Find a page in a shared buffer, reading it in if necessary.
342  * The page number must correspond to an already-initialized page.
343  *
344  * If write_ok is true then it is OK to return a page that is in
345  * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
346  * that modification of the page is safe.  If write_ok is false then we
347  * will not return the page until it is not undergoing active I/O.
348  *
349  * The passed-in xid is used only for error reporting, and may be
350  * InvalidTransactionId if no specific xid is associated with the action.
351  *
352  * Return value is the shared-buffer slot number now holding the page.
353  * The buffer's LRU access info is updated.
354  *
355  * Control lock must be held at entry, and will be held at exit.
356  */
357 int
358 SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
359                                   TransactionId xid)
360 {
361         SlruShared      shared = ctl->shared;
362
363         /* Outer loop handles restart if we must wait for someone else's I/O */
364         for (;;)
365         {
366                 int                     slotno;
367                 bool            ok;
368
369                 /* See if page already is in memory; if not, pick victim slot */
370                 slotno = SlruSelectLRUPage(ctl, pageno);
371
372                 /* Did we find the page in memory? */
373                 if (shared->page_number[slotno] == pageno &&
374                         shared->page_status[slotno] != SLRU_PAGE_EMPTY)
375                 {
376                         /*
377                          * If page is still being read in, we must wait for I/O.  Likewise
378                          * if the page is being written and the caller said that's not OK.
379                          */
380                         if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
381                                 (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
382                                  !write_ok))
383                         {
384                                 SimpleLruWaitIO(ctl, slotno);
385                                 /* Now we must recheck state from the top */
386                                 continue;
387                         }
388                         /* Otherwise, it's ready to use */
389                         SlruRecentlyUsed(shared, slotno);
390                         return slotno;
391                 }
392
393                 /* We found no match; assert we selected a freeable slot */
394                 Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
395                            (shared->page_status[slotno] == SLRU_PAGE_VALID &&
396                                 !shared->page_dirty[slotno]));
397
398                 /* Mark the slot read-busy */
399                 shared->page_number[slotno] = pageno;
400                 shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
401                 shared->page_dirty[slotno] = false;
402
403                 /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
404                 LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
405
406                 /* Release control lock while doing I/O */
407                 LWLockRelease(shared->ControlLock);
408
409                 /* Do the read */
410                 ok = SlruPhysicalReadPage(ctl, pageno, slotno);
411
412                 /* Set the LSNs for this newly read-in page to zero */
413                 SimpleLruZeroLSNs(ctl, slotno);
414
415                 /* Re-acquire control lock and update page state */
416                 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
417
418                 Assert(shared->page_number[slotno] == pageno &&
419                            shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
420                            !shared->page_dirty[slotno]);
421
422                 shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
423
424                 LWLockRelease(shared->buffer_locks[slotno]);
425
426                 /* Now it's okay to ereport if we failed */
427                 if (!ok)
428                         SlruReportIOError(ctl, pageno, xid);
429
430                 SlruRecentlyUsed(shared, slotno);
431                 return slotno;
432         }
433 }
434
435 /*
436  * Find a page in a shared buffer, reading it in if necessary.
437  * The page number must correspond to an already-initialized page.
438  * The caller must intend only read-only access to the page.
439  *
440  * The passed-in xid is used only for error reporting, and may be
441  * InvalidTransactionId if no specific xid is associated with the action.
442  *
443  * Return value is the shared-buffer slot number now holding the page.
444  * The buffer's LRU access info is updated.
445  *
446  * Control lock must NOT be held at entry, but will be held at exit.
447  * It is unspecified whether the lock will be shared or exclusive.
448  */
449 int
450 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
451 {
452         SlruShared      shared = ctl->shared;
453         int                     slotno;
454
455         /* Try to find the page while holding only shared lock */
456         LWLockAcquire(shared->ControlLock, LW_SHARED);
457
458         /* See if page is already in a buffer */
459         for (slotno = 0; slotno < shared->num_slots; slotno++)
460         {
461                 if (shared->page_number[slotno] == pageno &&
462                         shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
463                         shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
464                 {
465                         /* See comments for SlruRecentlyUsed macro */
466                         SlruRecentlyUsed(shared, slotno);
467                         return slotno;
468                 }
469         }
470
471         /* No luck, so switch to normal exclusive lock and do regular read */
472         LWLockRelease(shared->ControlLock);
473         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
474
475         return SimpleLruReadPage(ctl, pageno, true, xid);
476 }
477
478 /*
479  * Write a page from a shared buffer, if necessary.
480  * Does nothing if the specified slot is not dirty.
481  *
482  * NOTE: only one write attempt is made here.  Hence, it is possible that
483  * the page is still dirty at exit (if someone else re-dirtied it during
484  * the write).  However, we *do* attempt a fresh write even if the page
485  * is already being written; this is for checkpoints.
486  *
487  * Control lock must be held at entry, and will be held at exit.
488  */
489 static void
490 SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
491 {
492         SlruShared      shared = ctl->shared;
493         int                     pageno = shared->page_number[slotno];
494         bool            ok;
495
496         /* If a write is in progress, wait for it to finish */
497         while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
498                    shared->page_number[slotno] == pageno)
499         {
500                 SimpleLruWaitIO(ctl, slotno);
501         }
502
503         /*
504          * Do nothing if page is not dirty, or if buffer no longer contains the
505          * same page we were called for.
506          */
507         if (!shared->page_dirty[slotno] ||
508                 shared->page_status[slotno] != SLRU_PAGE_VALID ||
509                 shared->page_number[slotno] != pageno)
510                 return;
511
512         /*
513          * Mark the slot write-busy, and clear the dirtybit.  After this point, a
514          * transaction status update on this page will mark it dirty again.
515          */
516         shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
517         shared->page_dirty[slotno] = false;
518
519         /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
520         LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
521
522         /* Release control lock while doing I/O */
523         LWLockRelease(shared->ControlLock);
524
525         /* Do the write */
526         ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
527
528         /* If we failed, and we're in a flush, better close the files */
529         if (!ok && fdata)
530         {
531                 int                     i;
532
533                 for (i = 0; i < fdata->num_files; i++)
534                         CloseTransientFile(fdata->fd[i]);
535         }
536
537         /* Re-acquire control lock and update page state */
538         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
539
540         Assert(shared->page_number[slotno] == pageno &&
541                    shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
542
543         /* If we failed to write, mark the page dirty again */
544         if (!ok)
545                 shared->page_dirty[slotno] = true;
546
547         shared->page_status[slotno] = SLRU_PAGE_VALID;
548
549         LWLockRelease(shared->buffer_locks[slotno]);
550
551         /* Now it's okay to ereport if we failed */
552         if (!ok)
553                 SlruReportIOError(ctl, pageno, InvalidTransactionId);
554 }
555
556 /*
557  * Wrapper of SlruInternalWritePage, for external callers.
558  * fdata is always passed a NULL here.
559  */
560 void
561 SimpleLruWritePage(SlruCtl ctl, int slotno)
562 {
563         SlruInternalWritePage(ctl, slotno, NULL);
564 }
565
566 /*
567  * Return whether the given page exists on disk.
568  *
569  * A false return means that either the file does not exist, or that it's not
570  * large enough to contain the given page.
571  */
572 bool
573 SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
574 {
575         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
576         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
577         int                     offset = rpageno * BLCKSZ;
578         char            path[MAXPGPATH];
579         int                     fd;
580         bool            result;
581         off_t           endpos;
582
583         SlruFileName(ctl, path, segno);
584
585         fd = OpenTransientFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
586         if (fd < 0)
587         {
588                 /* expected: file doesn't exist */
589                 if (errno == ENOENT)
590                         return false;
591
592                 /* report error normally */
593                 slru_errcause = SLRU_OPEN_FAILED;
594                 slru_errno = errno;
595                 SlruReportIOError(ctl, pageno, 0);
596         }
597
598         if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
599         {
600                 slru_errcause = SLRU_OPEN_FAILED;
601                 slru_errno = errno;
602                 SlruReportIOError(ctl, pageno, 0);
603         }
604
605         result = endpos >= (off_t) (offset + BLCKSZ);
606
607         CloseTransientFile(fd);
608         return result;
609 }
610
611 /*
612  * Physical read of a (previously existing) page into a buffer slot
613  *
614  * On failure, we cannot just ereport(ERROR) since caller has put state in
615  * shared memory that must be undone.  So, we return FALSE and save enough
616  * info in static variables to let SlruReportIOError make the report.
617  *
618  * For now, assume it's not worth keeping a file pointer open across
619  * read/write operations.  We could cache one virtual file pointer ...
620  */
621 static bool
622 SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
623 {
624         SlruShared      shared = ctl->shared;
625         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
626         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
627         int                     offset = rpageno * BLCKSZ;
628         char            path[MAXPGPATH];
629         int                     fd;
630
631         SlruFileName(ctl, path, segno);
632
633         /*
634          * In a crash-and-restart situation, it's possible for us to receive
635          * commands to set the commit status of transactions whose bits are in
636          * already-truncated segments of the commit log (see notes in
637          * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
638          * where the file doesn't exist, and return zeroes instead.
639          */
640         fd = OpenTransientFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
641         if (fd < 0)
642         {
643                 if (errno != ENOENT || !InRecovery)
644                 {
645                         slru_errcause = SLRU_OPEN_FAILED;
646                         slru_errno = errno;
647                         return false;
648                 }
649
650                 ereport(LOG,
651                                 (errmsg("file \"%s\" doesn't exist, reading as zeroes",
652                                                 path)));
653                 MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
654                 return true;
655         }
656
657         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
658         {
659                 slru_errcause = SLRU_SEEK_FAILED;
660                 slru_errno = errno;
661                 CloseTransientFile(fd);
662                 return false;
663         }
664
665         errno = 0;
666         if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
667         {
668                 slru_errcause = SLRU_READ_FAILED;
669                 slru_errno = errno;
670                 CloseTransientFile(fd);
671                 return false;
672         }
673
674         if (CloseTransientFile(fd))
675         {
676                 slru_errcause = SLRU_CLOSE_FAILED;
677                 slru_errno = errno;
678                 return false;
679         }
680
681         return true;
682 }
683
684 /*
685  * Physical write of a page from a buffer slot
686  *
687  * On failure, we cannot just ereport(ERROR) since caller has put state in
688  * shared memory that must be undone.  So, we return FALSE and save enough
689  * info in static variables to let SlruReportIOError make the report.
690  *
691  * For now, assume it's not worth keeping a file pointer open across
692  * independent read/write operations.  We do batch operations during
693  * SimpleLruFlush, though.
694  *
695  * fdata is NULL for a standalone write, pointer to open-file info during
696  * SimpleLruFlush.
697  */
698 static bool
699 SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
700 {
701         SlruShared      shared = ctl->shared;
702         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
703         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
704         int                     offset = rpageno * BLCKSZ;
705         char            path[MAXPGPATH];
706         int                     fd = -1;
707
708         /*
709          * Honor the write-WAL-before-data rule, if appropriate, so that we do not
710          * write out data before associated WAL records.  This is the same action
711          * performed during FlushBuffer() in the main buffer manager.
712          */
713         if (shared->group_lsn != NULL)
714         {
715                 /*
716                  * We must determine the largest async-commit LSN for the page. This
717                  * is a bit tedious, but since this entire function is a slow path
718                  * anyway, it seems better to do this here than to maintain a per-page
719                  * LSN variable (which'd need an extra comparison in the
720                  * transaction-commit path).
721                  */
722                 XLogRecPtr      max_lsn;
723                 int                     lsnindex,
724                                         lsnoff;
725
726                 lsnindex = slotno * shared->lsn_groups_per_page;
727                 max_lsn = shared->group_lsn[lsnindex++];
728                 for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
729                 {
730                         XLogRecPtr      this_lsn = shared->group_lsn[lsnindex++];
731
732                         if (max_lsn < this_lsn)
733                                 max_lsn = this_lsn;
734                 }
735
736                 if (!XLogRecPtrIsInvalid(max_lsn))
737                 {
738                         /*
739                          * As noted above, elog(ERROR) is not acceptable here, so if
740                          * XLogFlush were to fail, we must PANIC.  This isn't much of a
741                          * restriction because XLogFlush is just about all critical
742                          * section anyway, but let's make sure.
743                          */
744                         START_CRIT_SECTION();
745                         XLogFlush(max_lsn);
746                         END_CRIT_SECTION();
747                 }
748         }
749
750         /*
751          * During a Flush, we may already have the desired file open.
752          */
753         if (fdata)
754         {
755                 int                     i;
756
757                 for (i = 0; i < fdata->num_files; i++)
758                 {
759                         if (fdata->segno[i] == segno)
760                         {
761                                 fd = fdata->fd[i];
762                                 break;
763                         }
764                 }
765         }
766
767         if (fd < 0)
768         {
769                 /*
770                  * If the file doesn't already exist, we should create it.  It is
771                  * possible for this to need to happen when writing a page that's not
772                  * first in its segment; we assume the OS can cope with that. (Note:
773                  * it might seem that it'd be okay to create files only when
774                  * SimpleLruZeroPage is called for the first page of a segment.
775                  * However, if after a crash and restart the REDO logic elects to
776                  * replay the log from a checkpoint before the latest one, then it's
777                  * possible that we will get commands to set transaction status of
778                  * transactions that have already been truncated from the commit log.
779                  * Easiest way to deal with that is to accept references to
780                  * nonexistent files here and in SlruPhysicalReadPage.)
781                  *
782                  * Note: it is possible for more than one backend to be executing this
783                  * code simultaneously for different pages of the same file. Hence,
784                  * don't use O_EXCL or O_TRUNC or anything like that.
785                  */
786                 SlruFileName(ctl, path, segno);
787                 fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY,
788                                                            S_IRUSR | S_IWUSR);
789                 if (fd < 0)
790                 {
791                         slru_errcause = SLRU_OPEN_FAILED;
792                         slru_errno = errno;
793                         return false;
794                 }
795
796                 if (fdata)
797                 {
798                         if (fdata->num_files < MAX_FLUSH_BUFFERS)
799                         {
800                                 fdata->fd[fdata->num_files] = fd;
801                                 fdata->segno[fdata->num_files] = segno;
802                                 fdata->num_files++;
803                         }
804                         else
805                         {
806                                 /*
807                                  * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
808                                  * fall back to treating it as a standalone write.
809                                  */
810                                 fdata = NULL;
811                         }
812                 }
813         }
814
815         if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
816         {
817                 slru_errcause = SLRU_SEEK_FAILED;
818                 slru_errno = errno;
819                 if (!fdata)
820                         CloseTransientFile(fd);
821                 return false;
822         }
823
824         errno = 0;
825         if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
826         {
827                 /* if write didn't set errno, assume problem is no disk space */
828                 if (errno == 0)
829                         errno = ENOSPC;
830                 slru_errcause = SLRU_WRITE_FAILED;
831                 slru_errno = errno;
832                 if (!fdata)
833                         CloseTransientFile(fd);
834                 return false;
835         }
836
837         /*
838          * If not part of Flush, need to fsync now.  We assume this happens
839          * infrequently enough that it's not a performance issue.
840          */
841         if (!fdata)
842         {
843                 if (ctl->do_fsync && pg_fsync(fd))
844                 {
845                         slru_errcause = SLRU_FSYNC_FAILED;
846                         slru_errno = errno;
847                         CloseTransientFile(fd);
848                         return false;
849                 }
850
851                 if (CloseTransientFile(fd))
852                 {
853                         slru_errcause = SLRU_CLOSE_FAILED;
854                         slru_errno = errno;
855                         return false;
856                 }
857         }
858
859         return true;
860 }
861
862 /*
863  * Issue the error message after failure of SlruPhysicalReadPage or
864  * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
865  */
866 static void
867 SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
868 {
869         int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;
870         int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
871         int                     offset = rpageno * BLCKSZ;
872         char            path[MAXPGPATH];
873
874         SlruFileName(ctl, path, segno);
875         errno = slru_errno;
876         switch (slru_errcause)
877         {
878                 case SLRU_OPEN_FAILED:
879                         ereport(ERROR,
880                                         (errcode_for_file_access(),
881                                          errmsg("could not access status of transaction %u", xid),
882                                          errdetail("Could not open file \"%s\": %m.", path)));
883                         break;
884                 case SLRU_SEEK_FAILED:
885                         ereport(ERROR,
886                                         (errcode_for_file_access(),
887                                          errmsg("could not access status of transaction %u", xid),
888                                  errdetail("Could not seek in file \"%s\" to offset %u: %m.",
889                                                    path, offset)));
890                         break;
891                 case SLRU_READ_FAILED:
892                         ereport(ERROR,
893                                         (errcode_for_file_access(),
894                                          errmsg("could not access status of transaction %u", xid),
895                            errdetail("Could not read from file \"%s\" at offset %u: %m.",
896                                                  path, offset)));
897                         break;
898                 case SLRU_WRITE_FAILED:
899                         ereport(ERROR,
900                                         (errcode_for_file_access(),
901                                          errmsg("could not access status of transaction %u", xid),
902                                 errdetail("Could not write to file \"%s\" at offset %u: %m.",
903                                                   path, offset)));
904                         break;
905                 case SLRU_FSYNC_FAILED:
906                         ereport(ERROR,
907                                         (errcode_for_file_access(),
908                                          errmsg("could not access status of transaction %u", xid),
909                                          errdetail("Could not fsync file \"%s\": %m.",
910                                                            path)));
911                         break;
912                 case SLRU_CLOSE_FAILED:
913                         ereport(ERROR,
914                                         (errcode_for_file_access(),
915                                          errmsg("could not access status of transaction %u", xid),
916                                          errdetail("Could not close file \"%s\": %m.",
917                                                            path)));
918                         break;
919                 default:
920                         /* can't get here, we trust */
921                         elog(ERROR, "unrecognized SimpleLru error cause: %d",
922                                  (int) slru_errcause);
923                         break;
924         }
925 }
926
927 /*
928  * Select the slot to re-use when we need a free slot.
929  *
930  * The target page number is passed because we need to consider the
931  * possibility that some other process reads in the target page while
932  * we are doing I/O to free a slot.  Hence, check or recheck to see if
933  * any slot already holds the target page, and return that slot if so.
934  * Thus, the returned slot is *either* a slot already holding the pageno
935  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
936  * or CLEAN).
937  *
938  * Control lock must be held at entry, and will be held at exit.
939  */
940 static int
941 SlruSelectLRUPage(SlruCtl ctl, int pageno)
942 {
943         SlruShared      shared = ctl->shared;
944
945         /* Outer loop handles restart after I/O */
946         for (;;)
947         {
948                 int                     slotno;
949                 int                     cur_count;
950                 int                     bestvalidslot = 0;      /* keep compiler quiet */
951                 int                     best_valid_delta = -1;
952                 int                     best_valid_page_number = 0; /* keep compiler quiet */
953                 int                     bestinvalidslot = 0;            /* keep compiler quiet */
954                 int                     best_invalid_delta = -1;
955                 int                     best_invalid_page_number = 0;           /* keep compiler quiet */
956
957                 /* See if page already has a buffer assigned */
958                 for (slotno = 0; slotno < shared->num_slots; slotno++)
959                 {
960                         if (shared->page_number[slotno] == pageno &&
961                                 shared->page_status[slotno] != SLRU_PAGE_EMPTY)
962                                 return slotno;
963                 }
964
965                 /*
966                  * If we find any EMPTY slot, just select that one. Else choose a
967                  * victim page to replace.  We normally take the least recently used
968                  * valid page, but we will never take the slot containing
969                  * latest_page_number, even if it appears least recently used.  We
970                  * will select a slot that is already I/O busy only if there is no
971                  * other choice: a read-busy slot will not be least recently used once
972                  * the read finishes, and waiting for an I/O on a write-busy slot is
973                  * inferior to just picking some other slot.  Testing shows the slot
974                  * we pick instead will often be clean, allowing us to begin a read at
975                  * once.
976                  *
977                  * Normally the page_lru_count values will all be different and so
978                  * there will be a well-defined LRU page.  But since we allow
979                  * concurrent execution of SlruRecentlyUsed() within
980                  * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
981                  * acquire the same lru_count values.  In that case we break ties by
982                  * choosing the furthest-back page.
983                  *
984                  * Notice that this next line forcibly advances cur_lru_count to a
985                  * value that is certainly beyond any value that will be in the
986                  * page_lru_count array after the loop finishes.  This ensures that
987                  * the next execution of SlruRecentlyUsed will mark the page newly
988                  * used, even if it's for a page that has the current counter value.
989                  * That gets us back on the path to having good data when there are
990                  * multiple pages with the same lru_count.
991                  */
992                 cur_count = (shared->cur_lru_count)++;
993                 for (slotno = 0; slotno < shared->num_slots; slotno++)
994                 {
995                         int                     this_delta;
996                         int                     this_page_number;
997
998                         if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
999                                 return slotno;
1000                         this_delta = cur_count - shared->page_lru_count[slotno];
1001                         if (this_delta < 0)
1002                         {
1003                                 /*
1004                                  * Clean up in case shared updates have caused cur_count
1005                                  * increments to get "lost".  We back off the page counts,
1006                                  * rather than trying to increase cur_count, to avoid any
1007                                  * question of infinite loops or failure in the presence of
1008                                  * wrapped-around counts.
1009                                  */
1010                                 shared->page_lru_count[slotno] = cur_count;
1011                                 this_delta = 0;
1012                         }
1013                         this_page_number = shared->page_number[slotno];
1014                         if (this_page_number == shared->latest_page_number)
1015                                 continue;
1016                         if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1017                         {
1018                                 if (this_delta > best_valid_delta ||
1019                                         (this_delta == best_valid_delta &&
1020                                          ctl->PagePrecedes(this_page_number,
1021                                                                            best_valid_page_number)))
1022                                 {
1023                                         bestvalidslot = slotno;
1024                                         best_valid_delta = this_delta;
1025                                         best_valid_page_number = this_page_number;
1026                                 }
1027                         }
1028                         else
1029                         {
1030                                 if (this_delta > best_invalid_delta ||
1031                                         (this_delta == best_invalid_delta &&
1032                                          ctl->PagePrecedes(this_page_number,
1033                                                                            best_invalid_page_number)))
1034                                 {
1035                                         bestinvalidslot = slotno;
1036                                         best_invalid_delta = this_delta;
1037                                         best_invalid_page_number = this_page_number;
1038                                 }
1039                         }
1040                 }
1041
1042                 /*
1043                  * If all pages (except possibly the latest one) are I/O busy, we'll
1044                  * have to wait for an I/O to complete and then retry.  In that
1045                  * unhappy case, we choose to wait for the I/O on the least recently
1046                  * used slot, on the assumption that it was likely initiated first of
1047                  * all the I/Os in progress and may therefore finish first.
1048                  */
1049                 if (best_valid_delta < 0)
1050                 {
1051                         SimpleLruWaitIO(ctl, bestinvalidslot);
1052                         continue;
1053                 }
1054
1055                 /*
1056                  * If the selected page is clean, we're set.
1057                  */
1058                 if (!shared->page_dirty[bestvalidslot])
1059                         return bestvalidslot;
1060
1061                 /*
1062                  * Write the page.
1063                  */
1064                 SlruInternalWritePage(ctl, bestvalidslot, NULL);
1065
1066                 /*
1067                  * Now loop back and try again.  This is the easiest way of dealing
1068                  * with corner cases such as the victim page being re-dirtied while we
1069                  * wrote it.
1070                  */
1071         }
1072 }
1073
1074 /*
1075  * Flush dirty pages to disk during checkpoint or database shutdown
1076  */
1077 void
1078 SimpleLruFlush(SlruCtl ctl, bool checkpoint)
1079 {
1080         SlruShared      shared = ctl->shared;
1081         SlruFlushData fdata;
1082         int                     slotno;
1083         int                     pageno = 0;
1084         int                     i;
1085         bool            ok;
1086
1087         /*
1088          * Find and write dirty pages
1089          */
1090         fdata.num_files = 0;
1091
1092         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1093
1094         for (slotno = 0; slotno < shared->num_slots; slotno++)
1095         {
1096                 SlruInternalWritePage(ctl, slotno, &fdata);
1097
1098                 /*
1099                  * When called during a checkpoint, we cannot assert that the slot is
1100                  * clean now, since another process might have re-dirtied it already.
1101                  * That's okay.
1102                  */
1103                 Assert(checkpoint ||
1104                            shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1105                            (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1106                                 !shared->page_dirty[slotno]));
1107         }
1108
1109         LWLockRelease(shared->ControlLock);
1110
1111         /*
1112          * Now fsync and close any files that were open
1113          */
1114         ok = true;
1115         for (i = 0; i < fdata.num_files; i++)
1116         {
1117                 if (ctl->do_fsync && pg_fsync(fdata.fd[i]))
1118                 {
1119                         slru_errcause = SLRU_FSYNC_FAILED;
1120                         slru_errno = errno;
1121                         pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1122                         ok = false;
1123                 }
1124
1125                 if (CloseTransientFile(fdata.fd[i]))
1126                 {
1127                         slru_errcause = SLRU_CLOSE_FAILED;
1128                         slru_errno = errno;
1129                         pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1130                         ok = false;
1131                 }
1132         }
1133         if (!ok)
1134                 SlruReportIOError(ctl, pageno, InvalidTransactionId);
1135 }
1136
1137 /*
1138  * Remove all segments before the one holding the passed page number
1139  */
1140 void
1141 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
1142 {
1143         SlruShared      shared = ctl->shared;
1144         int                     slotno;
1145
1146         /*
1147          * The cutoff point is the start of the segment containing cutoffPage.
1148          */
1149         cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1150
1151         /*
1152          * Scan shared memory and remove any pages preceding the cutoff page, to
1153          * ensure we won't rewrite them later.  (Since this is normally called in
1154          * or just after a checkpoint, any dirty pages should have been flushed
1155          * already ... we're just being extra careful here.)
1156          */
1157         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1158
1159 restart:;
1160
1161         /*
1162          * While we are holding the lock, make an important safety check: the
1163          * planned cutoff point must be <= the current endpoint page. Otherwise we
1164          * have already wrapped around, and proceeding with the truncation would
1165          * risk removing the current segment.
1166          */
1167         if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
1168         {
1169                 LWLockRelease(shared->ControlLock);
1170                 ereport(LOG,
1171                   (errmsg("could not truncate directory \"%s\": apparent wraparound",
1172                                   ctl->Dir)));
1173                 return;
1174         }
1175
1176         for (slotno = 0; slotno < shared->num_slots; slotno++)
1177         {
1178                 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1179                         continue;
1180                 if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1181                         continue;
1182
1183                 /*
1184                  * If page is clean, just change state to EMPTY (expected case).
1185                  */
1186                 if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1187                         !shared->page_dirty[slotno])
1188                 {
1189                         shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1190                         continue;
1191                 }
1192
1193                 /*
1194                  * Hmm, we have (or may have) I/O operations acting on the page, so
1195                  * we've got to wait for them to finish and then start again. This is
1196                  * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
1197                  * wouldn't it be OK to just discard it without writing it?  For now,
1198                  * keep the logic the same as it was.)
1199                  */
1200                 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1201                         SlruInternalWritePage(ctl, slotno, NULL);
1202                 else
1203                         SimpleLruWaitIO(ctl, slotno);
1204                 goto restart;
1205         }
1206
1207         LWLockRelease(shared->ControlLock);
1208
1209         /* Now we can remove the old segment(s) */
1210         (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1211 }
1212
1213 void
1214 SlruDeleteSegment(SlruCtl ctl, char *filename)
1215 {
1216         char            path[MAXPGPATH];
1217
1218         snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
1219         ereport(DEBUG2,
1220                         (errmsg("removing file \"%s\"", path)));
1221         unlink(path);
1222 }
1223
1224 /*
1225  * SlruScanDirectory callback
1226  *              This callback reports true if there's any segment prior to the one
1227  *              containing the page passed as "data".
1228  */
1229 bool
1230 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
1231 {
1232         int                     cutoffPage = *(int *) data;
1233
1234         cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1235
1236         if (ctl->PagePrecedes(segpage, cutoffPage))
1237                 return true;                    /* found one; don't iterate any more */
1238
1239         return false;                           /* keep going */
1240 }
1241
1242 /*
1243  * SlruScanDirectory callback.
1244  *              This callback deletes segments prior to the one passed in as "data".
1245  */
1246 static bool
1247 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
1248 {
1249         int                     cutoffPage = *(int *) data;
1250
1251         if (ctl->PagePrecedes(segpage, cutoffPage))
1252                 SlruDeleteSegment(ctl, filename);
1253
1254         return false;                           /* keep going */
1255 }
1256
1257 /*
1258  * SlruScanDirectory callback.
1259  *              This callback deletes all segments.
1260  */
1261 bool
1262 SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
1263 {
1264         SlruDeleteSegment(ctl, filename);
1265
1266         return false;                           /* keep going */
1267 }
1268
1269 /*
1270  * Scan the SimpleLRU directory and apply a callback to each file found in it.
1271  *
1272  * If the callback returns true, the scan is stopped.  The last return value
1273  * from the callback is returned.
1274  *
1275  * The callback receives the following arguments: 1. the SlruCtl struct for the
1276  * slru being truncated; 2. the filename being considered; 3. the page number
1277  * for the first page of that file; 4. a pointer to the opaque data given to us
1278  * by the caller.
1279  *
1280  * Note that the ordering in which the directory is scanned is not guaranteed.
1281  *
1282  * Note that no locking is applied.
1283  */
1284 bool
1285 SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1286 {
1287         bool            retval = false;
1288         DIR                *cldir;
1289         struct dirent *clde;
1290         int                     segno;
1291         int                     segpage;
1292
1293         cldir = AllocateDir(ctl->Dir);
1294         while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1295         {
1296                 size_t          len;
1297
1298                 len = strlen(clde->d_name);
1299
1300                 if ((len == 4 || len == 5 || len == 6) &&
1301                         strspn(clde->d_name, "0123456789ABCDEF") == len)
1302                 {
1303                         segno = (int) strtol(clde->d_name, NULL, 16);
1304                         segpage = segno * SLRU_PAGES_PER_SEGMENT;
1305
1306                         elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1307                                  ctl->Dir, clde->d_name);
1308                         retval = callback(ctl, clde->d_name, segpage, data);
1309                         if (retval)
1310                                 break;
1311                 }
1312         }
1313         FreeDir(cldir);
1314
1315         return retval;
1316 }