]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
07ea6652190dd7d390d0c4398856fd887f0bfe5d
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *              and pin it so that no one can destroy it while this process
20  *              is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *              The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *              freelist.c -- chooses victim for buffer replacement
29  *              buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "catalog/catalog.h"
37 #include "catalog/storage.h"
38 #include "executor/instrument.h"
39 #include "miscadmin.h"
40 #include "pg_trace.h"
41 #include "pgstat.h"
42 #include "postmaster/bgwriter.h"
43 #include "storage/buf_internals.h"
44 #include "storage/bufmgr.h"
45 #include "storage/ipc.h"
46 #include "storage/proc.h"
47 #include "storage/smgr.h"
48 #include "storage/standby.h"
49 #include "utils/rel.h"
50 #include "utils/resowner_private.h"
51 #include "utils/timestamp.h"
52
53
54 /* Note: these two macros only work on shared buffers, not local ones! */
55 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
56 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
57
58 /* Note: this macro only works on local buffers, not shared ones! */
59 #define LocalBufHdrGetBlock(bufHdr) \
60         LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
61
62 /* Bits in SyncOneBuffer's return value */
63 #define BUF_WRITTEN                             0x01
64 #define BUF_REUSABLE                    0x02
65
66 #define DROP_RELS_BSEARCH_THRESHOLD             20
67
68 /* GUC variables */
69 bool            zero_damaged_pages = false;
70 int                     bgwriter_lru_maxpages = 100;
71 double          bgwriter_lru_multiplier = 2.0;
72 bool            track_io_timing = false;
73
74 /*
75  * How many buffers PrefetchBuffer callers should try to stay ahead of their
76  * ReadBuffer calls by.  This is maintained by the assign hook for
77  * effective_io_concurrency.  Zero means "never prefetch".
78  */
79 int                     target_prefetch_pages = 0;
80
81 /* local state for StartBufferIO and related functions */
82 static volatile BufferDesc *InProgressBuf = NULL;
83 static bool IsForInput;
84
85 /* local state for LockBufferForCleanup */
86 static volatile BufferDesc *PinCountWaitBuf = NULL;
87
88
89 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
90                                   ForkNumber forkNum, BlockNumber blockNum,
91                                   ReadBufferMode mode, BufferAccessStrategy strategy,
92                                   bool *hit);
93 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
94 static void PinBuffer_Locked(volatile BufferDesc *buf);
95 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
96 static void BufferSync(int flags);
97 static int      SyncOneBuffer(int buf_id, bool skip_recently_used);
98 static void WaitIO(volatile BufferDesc *buf);
99 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
100 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
101                                   int set_flag_bits);
102 static void shared_buffer_write_error_callback(void *arg);
103 static void local_buffer_write_error_callback(void *arg);
104 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
105                         char relpersistence,
106                         ForkNumber forkNum,
107                         BlockNumber blockNum,
108                         BufferAccessStrategy strategy,
109                         bool *foundPtr);
110 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
111 static void AtProcExit_Buffers(int code, Datum arg);
112 static void CheckForBufferLeaks(void);
113 static int      rnode_comparator(const void *p1, const void *p2);
114
115
116 /*
117  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
118  *
119  * This is named by analogy to ReadBuffer but doesn't actually allocate a
120  * buffer.  Instead it tries to ensure that a future ReadBuffer for the given
121  * block will not be delayed by the I/O.  Prefetching is optional.
122  * No-op if prefetching isn't compiled in.
123  */
124 void
125 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
126 {
127 #ifdef USE_PREFETCH
128         Assert(RelationIsValid(reln));
129         Assert(BlockNumberIsValid(blockNum));
130
131         /* Open it at the smgr level if not already done */
132         RelationOpenSmgr(reln);
133
134         if (RelationUsesLocalBuffers(reln))
135         {
136                 /* see comments in ReadBufferExtended */
137                 if (RELATION_IS_OTHER_TEMP(reln))
138                         ereport(ERROR,
139                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
140                                 errmsg("cannot access temporary tables of other sessions")));
141
142                 /* pass it off to localbuf.c */
143                 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
144         }
145         else
146         {
147                 BufferTag       newTag;         /* identity of requested block */
148                 uint32          newHash;        /* hash value for newTag */
149                 LWLock     *newPartitionLock;   /* buffer partition lock for it */
150                 int                     buf_id;
151
152                 /* create a tag so we can lookup the buffer */
153                 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
154                                            forkNum, blockNum);
155
156                 /* determine its hash code and partition lock ID */
157                 newHash = BufTableHashCode(&newTag);
158                 newPartitionLock = BufMappingPartitionLock(newHash);
159
160                 /* see if the block is in the buffer pool already */
161                 LWLockAcquire(newPartitionLock, LW_SHARED);
162                 buf_id = BufTableLookup(&newTag, newHash);
163                 LWLockRelease(newPartitionLock);
164
165                 /* If not in buffers, initiate prefetch */
166                 if (buf_id < 0)
167                         smgrprefetch(reln->rd_smgr, forkNum, blockNum);
168
169                 /*
170                  * If the block *is* in buffers, we do nothing.  This is not really
171                  * ideal: the block might be just about to be evicted, which would be
172                  * stupid since we know we are going to need it soon.  But the only
173                  * easy answer is to bump the usage_count, which does not seem like a
174                  * great solution: when the caller does ultimately touch the block,
175                  * usage_count would get bumped again, resulting in too much
176                  * favoritism for blocks that are involved in a prefetch sequence. A
177                  * real fix would involve some additional per-buffer state, and it's
178                  * not clear that there's enough of a problem to justify that.
179                  */
180         }
181 #endif   /* USE_PREFETCH */
182 }
183
184
185 /*
186  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
187  *              fork with RBM_NORMAL mode and default strategy.
188  */
189 Buffer
190 ReadBuffer(Relation reln, BlockNumber blockNum)
191 {
192         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
193 }
194
195 /*
196  * ReadBufferExtended -- returns a buffer containing the requested
197  *              block of the requested relation.  If the blknum
198  *              requested is P_NEW, extend the relation file and
199  *              allocate a new block.  (Caller is responsible for
200  *              ensuring that only one backend tries to extend a
201  *              relation at the same time!)
202  *
203  * Returns: the buffer number for the buffer containing
204  *              the block read.  The returned buffer has been pinned.
205  *              Does not return on error --- elog's instead.
206  *
207  * Assume when this function is called, that reln has been opened already.
208  *
209  * In RBM_NORMAL mode, the page is read from disk, and the page header is
210  * validated.  An error is thrown if the page header is not valid.  (But
211  * note that an all-zero page is considered "valid"; see PageIsVerified().)
212  *
213  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
214  * valid, the page is zeroed instead of throwing an error. This is intended
215  * for non-critical data, where the caller is prepared to repair errors.
216  *
217  * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
218  * with zeros instead of reading it from disk.  Useful when the caller is
219  * going to fill the page from scratch, since this saves I/O and avoids
220  * unnecessary failure if the page-on-disk has corrupt page headers.
221  * Caution: do not use this mode to read a page that is beyond the relation's
222  * current physical EOF; that is likely to cause problems in md.c when
223  * the page is modified and written out. P_NEW is OK, though.
224  *
225  * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
226  *
227  * If strategy is not NULL, a nondefault buffer access strategy is used.
228  * See buffer/README for details.
229  */
230 Buffer
231 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
232                                    ReadBufferMode mode, BufferAccessStrategy strategy)
233 {
234         bool            hit;
235         Buffer          buf;
236
237         /* Open it at the smgr level if not already done */
238         RelationOpenSmgr(reln);
239
240         /*
241          * Reject attempts to read non-local temporary relations; we would be
242          * likely to get wrong data since we have no visibility into the owning
243          * session's local buffers.
244          */
245         if (RELATION_IS_OTHER_TEMP(reln))
246                 ereport(ERROR,
247                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
248                                  errmsg("cannot access temporary tables of other sessions")));
249
250         /*
251          * Read the buffer, and update pgstat counters to reflect a cache hit or
252          * miss.
253          */
254         pgstat_count_buffer_read(reln);
255         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
256                                                         forkNum, blockNum, mode, strategy, &hit);
257         if (hit)
258                 pgstat_count_buffer_hit(reln);
259         return buf;
260 }
261
262
263 /*
264  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
265  *              a relcache entry for the relation.
266  *
267  * NB: At present, this function may only be used on permanent relations, which
268  * is OK, because we only use it during XLOG replay.  If in the future we
269  * want to use it on temporary or unlogged relations, we could pass additional
270  * parameters.
271  */
272 Buffer
273 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
274                                                   BlockNumber blockNum, ReadBufferMode mode,
275                                                   BufferAccessStrategy strategy)
276 {
277         bool            hit;
278
279         SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
280
281         Assert(InRecovery);
282
283         return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
284                                                          mode, strategy, &hit);
285 }
286
287
288 /*
289  * ReadBuffer_common -- common logic for all ReadBuffer variants
290  *
291  * *hit is set to true if the request was satisfied from shared buffer cache.
292  */
293 static Buffer
294 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
295                                   BlockNumber blockNum, ReadBufferMode mode,
296                                   BufferAccessStrategy strategy, bool *hit)
297 {
298         volatile BufferDesc *bufHdr;
299         Block           bufBlock;
300         bool            found;
301         bool            isExtend;
302         bool            isLocalBuf = SmgrIsTemp(smgr);
303
304         *hit = false;
305
306         /* Make sure we will have room to remember the buffer pin */
307         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
308
309         isExtend = (blockNum == P_NEW);
310
311         TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
312                                                                            smgr->smgr_rnode.node.spcNode,
313                                                                            smgr->smgr_rnode.node.dbNode,
314                                                                            smgr->smgr_rnode.node.relNode,
315                                                                            smgr->smgr_rnode.backend,
316                                                                            isExtend);
317
318         /* Substitute proper block number if caller asked for P_NEW */
319         if (isExtend)
320                 blockNum = smgrnblocks(smgr, forkNum);
321
322         if (isLocalBuf)
323         {
324                 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
325                 if (found)
326                         pgBufferUsage.local_blks_hit++;
327                 else
328                         pgBufferUsage.local_blks_read++;
329         }
330         else
331         {
332                 /*
333                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
334                  * not currently in memory.
335                  */
336                 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
337                                                          strategy, &found);
338                 if (found)
339                         pgBufferUsage.shared_blks_hit++;
340                 else
341                         pgBufferUsage.shared_blks_read++;
342         }
343
344         /* At this point we do NOT hold any locks. */
345
346         /* if it was already in the buffer pool, we're done */
347         if (found)
348         {
349                 if (!isExtend)
350                 {
351                         /* Just need to update stats before we exit */
352                         *hit = true;
353                         VacuumPageHit++;
354
355                         if (VacuumCostActive)
356                                 VacuumCostBalance += VacuumCostPageHit;
357
358                         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
359                                                                                           smgr->smgr_rnode.node.spcNode,
360                                                                                           smgr->smgr_rnode.node.dbNode,
361                                                                                           smgr->smgr_rnode.node.relNode,
362                                                                                           smgr->smgr_rnode.backend,
363                                                                                           isExtend,
364                                                                                           found);
365
366                         return BufferDescriptorGetBuffer(bufHdr);
367                 }
368
369                 /*
370                  * We get here only in the corner case where we are trying to extend
371                  * the relation but we found a pre-existing buffer marked BM_VALID.
372                  * This can happen because mdread doesn't complain about reads beyond
373                  * EOF (when zero_damaged_pages is ON) and so a previous attempt to
374                  * read a block beyond EOF could have left a "valid" zero-filled
375                  * buffer.  Unfortunately, we have also seen this case occurring
376                  * because of buggy Linux kernels that sometimes return an
377                  * lseek(SEEK_END) result that doesn't account for a recent write. In
378                  * that situation, the pre-existing buffer would contain valid data
379                  * that we don't want to overwrite.  Since the legitimate case should
380                  * always have left a zero-filled buffer, complain if not PageIsNew.
381                  */
382                 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
383                 if (!PageIsNew((Page) bufBlock))
384                         ereport(ERROR,
385                          (errmsg("unexpected data beyond EOF in block %u of relation %s",
386                                          blockNum, relpath(smgr->smgr_rnode, forkNum)),
387                           errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
388
389                 /*
390                  * We *must* do smgrextend before succeeding, else the page will not
391                  * be reserved by the kernel, and the next P_NEW call will decide to
392                  * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
393                  * call that BufferAlloc didn't, and proceed.
394                  */
395                 if (isLocalBuf)
396                 {
397                         /* Only need to adjust flags */
398                         Assert(bufHdr->flags & BM_VALID);
399                         bufHdr->flags &= ~BM_VALID;
400                 }
401                 else
402                 {
403                         /*
404                          * Loop to handle the very small possibility that someone re-sets
405                          * BM_VALID between our clearing it and StartBufferIO inspecting
406                          * it.
407                          */
408                         do
409                         {
410                                 LockBufHdr(bufHdr);
411                                 Assert(bufHdr->flags & BM_VALID);
412                                 bufHdr->flags &= ~BM_VALID;
413                                 UnlockBufHdr(bufHdr);
414                         } while (!StartBufferIO(bufHdr, true));
415                 }
416         }
417
418         /*
419          * if we have gotten to this point, we have allocated a buffer for the
420          * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
421          * if it's a shared buffer.
422          *
423          * Note: if smgrextend fails, we will end up with a buffer that is
424          * allocated but not marked BM_VALID.  P_NEW will still select the same
425          * block number (because the relation didn't get any longer on disk) and
426          * so future attempts to extend the relation will find the same buffer (if
427          * it's not been recycled) but come right back here to try smgrextend
428          * again.
429          */
430         Assert(!(bufHdr->flags & BM_VALID));            /* spinlock not needed */
431
432         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
433
434         if (isExtend)
435         {
436                 /* new buffers are zero-filled */
437                 MemSet((char *) bufBlock, 0, BLCKSZ);
438                 /* don't set checksum for all-zero page */
439                 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
440         }
441         else
442         {
443                 /*
444                  * Read in the page, unless the caller intends to overwrite it and
445                  * just wants us to allocate a buffer.
446                  */
447                 if (mode == RBM_ZERO)
448                         MemSet((char *) bufBlock, 0, BLCKSZ);
449                 else
450                 {
451                         instr_time      io_start,
452                                                 io_time;
453
454                         if (track_io_timing)
455                                 INSTR_TIME_SET_CURRENT(io_start);
456
457                         smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
458
459                         if (track_io_timing)
460                         {
461                                 INSTR_TIME_SET_CURRENT(io_time);
462                                 INSTR_TIME_SUBTRACT(io_time, io_start);
463                                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
464                                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
465                         }
466
467                         /* check for garbage data */
468                         if (!PageIsVerified((Page) bufBlock, blockNum))
469                         {
470                                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
471                                 {
472                                         ereport(WARNING,
473                                                         (errcode(ERRCODE_DATA_CORRUPTED),
474                                                          errmsg("invalid page in block %u of relation %s; zeroing out page",
475                                                                         blockNum,
476                                                                         relpath(smgr->smgr_rnode, forkNum))));
477                                         MemSet((char *) bufBlock, 0, BLCKSZ);
478                                 }
479                                 else
480                                         ereport(ERROR,
481                                                         (errcode(ERRCODE_DATA_CORRUPTED),
482                                                          errmsg("invalid page in block %u of relation %s",
483                                                                         blockNum,
484                                                                         relpath(smgr->smgr_rnode, forkNum))));
485                         }
486                 }
487         }
488
489         if (isLocalBuf)
490         {
491                 /* Only need to adjust flags */
492                 bufHdr->flags |= BM_VALID;
493         }
494         else
495         {
496                 /* Set BM_VALID, terminate IO, and wake up any waiters */
497                 TerminateBufferIO(bufHdr, false, BM_VALID);
498         }
499
500         VacuumPageMiss++;
501         if (VacuumCostActive)
502                 VacuumCostBalance += VacuumCostPageMiss;
503
504         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
505                                                                           smgr->smgr_rnode.node.spcNode,
506                                                                           smgr->smgr_rnode.node.dbNode,
507                                                                           smgr->smgr_rnode.node.relNode,
508                                                                           smgr->smgr_rnode.backend,
509                                                                           isExtend,
510                                                                           found);
511
512         return BufferDescriptorGetBuffer(bufHdr);
513 }
514
515 /*
516  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
517  *              buffer.  If no buffer exists already, selects a replacement
518  *              victim and evicts the old page, but does NOT read in new page.
519  *
520  * "strategy" can be a buffer replacement strategy object, or NULL for
521  * the default strategy.  The selected buffer's usage_count is advanced when
522  * using the default strategy, but otherwise possibly not (see PinBuffer).
523  *
524  * The returned buffer is pinned and is already marked as holding the
525  * desired page.  If it already did have the desired page, *foundPtr is
526  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
527  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
528  *
529  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
530  * we keep it for simplicity in ReadBuffer.
531  *
532  * No locks are held either at entry or exit.
533  */
534 static volatile BufferDesc *
535 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
536                         BlockNumber blockNum,
537                         BufferAccessStrategy strategy,
538                         bool *foundPtr)
539 {
540         BufferTag       newTag;                 /* identity of requested block */
541         uint32          newHash;                /* hash value for newTag */
542         LWLock     *newPartitionLock;           /* buffer partition lock for it */
543         BufferTag       oldTag;                 /* previous identity of selected buffer */
544         uint32          oldHash;                /* hash value for oldTag */
545         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
546         BufFlags        oldFlags;
547         int                     buf_id;
548         volatile BufferDesc *buf;
549         bool            valid;
550
551         /* create a tag so we can lookup the buffer */
552         INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
553
554         /* determine its hash code and partition lock ID */
555         newHash = BufTableHashCode(&newTag);
556         newPartitionLock = BufMappingPartitionLock(newHash);
557
558         /* see if the block is in the buffer pool already */
559         LWLockAcquire(newPartitionLock, LW_SHARED);
560         buf_id = BufTableLookup(&newTag, newHash);
561         if (buf_id >= 0)
562         {
563                 /*
564                  * Found it.  Now, pin the buffer so no one can steal it from the
565                  * buffer pool, and check to see if the correct data has been loaded
566                  * into the buffer.
567                  */
568                 buf = &BufferDescriptors[buf_id];
569
570                 valid = PinBuffer(buf, strategy);
571
572                 /* Can release the mapping lock as soon as we've pinned it */
573                 LWLockRelease(newPartitionLock);
574
575                 *foundPtr = TRUE;
576
577                 if (!valid)
578                 {
579                         /*
580                          * We can only get here if (a) someone else is still reading in
581                          * the page, or (b) a previous read attempt failed.  We have to
582                          * wait for any active read attempt to finish, and then set up our
583                          * own read attempt if the page is still not BM_VALID.
584                          * StartBufferIO does it all.
585                          */
586                         if (StartBufferIO(buf, true))
587                         {
588                                 /*
589                                  * If we get here, previous attempts to read the buffer must
590                                  * have failed ... but we shall bravely try again.
591                                  */
592                                 *foundPtr = FALSE;
593                         }
594                 }
595
596                 return buf;
597         }
598
599         /*
600          * Didn't find it in the buffer pool.  We'll have to initialize a new
601          * buffer.  Remember to unlock the mapping lock while doing the work.
602          */
603         LWLockRelease(newPartitionLock);
604
605         /* Loop here in case we have to try another victim buffer */
606         for (;;)
607         {
608                 bool            lock_held;
609
610                 /*
611                  * Select a victim buffer.  The buffer is returned with its header
612                  * spinlock still held!  Also (in most cases) the BufFreelistLock is
613                  * still held, since it would be bad to hold the spinlock while
614                  * possibly waking up other processes.
615                  */
616                 buf = StrategyGetBuffer(strategy, &lock_held);
617
618                 Assert(buf->refcount == 0);
619
620                 /* Must copy buffer flags while we still hold the spinlock */
621                 oldFlags = buf->flags;
622
623                 /* Pin the buffer and then release the buffer spinlock */
624                 PinBuffer_Locked(buf);
625
626                 /* Now it's safe to release the freelist lock */
627                 if (lock_held)
628                         LWLockRelease(BufFreelistLock);
629
630                 /*
631                  * If the buffer was dirty, try to write it out.  There is a race
632                  * condition here, in that someone might dirty it after we released it
633                  * above, or even while we are writing it out (since our share-lock
634                  * won't prevent hint-bit updates).  We will recheck the dirty bit
635                  * after re-locking the buffer header.
636                  */
637                 if (oldFlags & BM_DIRTY)
638                 {
639                         /*
640                          * We need a share-lock on the buffer contents to write it out
641                          * (else we might write invalid data, eg because someone else is
642                          * compacting the page contents while we write).  We must use a
643                          * conditional lock acquisition here to avoid deadlock.  Even
644                          * though the buffer was not pinned (and therefore surely not
645                          * locked) when StrategyGetBuffer returned it, someone else could
646                          * have pinned and exclusive-locked it by the time we get here. If
647                          * we try to get the lock unconditionally, we'd block waiting for
648                          * them; if they later block waiting for us, deadlock ensues.
649                          * (This has been observed to happen when two backends are both
650                          * trying to split btree index pages, and the second one just
651                          * happens to be trying to split the page the first one got from
652                          * StrategyGetBuffer.)
653                          */
654                         if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
655                         {
656                                 /*
657                                  * If using a nondefault strategy, and writing the buffer
658                                  * would require a WAL flush, let the strategy decide whether
659                                  * to go ahead and write/reuse the buffer or to choose another
660                                  * victim.  We need lock to inspect the page LSN, so this
661                                  * can't be done inside StrategyGetBuffer.
662                                  */
663                                 if (strategy != NULL)
664                                 {
665                                         XLogRecPtr      lsn;
666
667                                         /* Read the LSN while holding buffer header lock */
668                                         LockBufHdr(buf);
669                                         lsn = BufferGetLSN(buf);
670                                         UnlockBufHdr(buf);
671
672                                         if (XLogNeedsFlush(lsn) &&
673                                                 StrategyRejectBuffer(strategy, buf))
674                                         {
675                                                 /* Drop lock/pin and loop around for another buffer */
676                                                 LWLockRelease(buf->content_lock);
677                                                 UnpinBuffer(buf, true);
678                                                 continue;
679                                         }
680                                 }
681
682                                 /* OK, do the I/O */
683                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
684                                                                                            smgr->smgr_rnode.node.spcNode,
685                                                                                                 smgr->smgr_rnode.node.dbNode,
686                                                                                           smgr->smgr_rnode.node.relNode);
687
688                                 FlushBuffer(buf, NULL);
689                                 LWLockRelease(buf->content_lock);
690
691                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
692                                                                                            smgr->smgr_rnode.node.spcNode,
693                                                                                                 smgr->smgr_rnode.node.dbNode,
694                                                                                           smgr->smgr_rnode.node.relNode);
695                         }
696                         else
697                         {
698                                 /*
699                                  * Someone else has locked the buffer, so give it up and loop
700                                  * back to get another one.
701                                  */
702                                 UnpinBuffer(buf, true);
703                                 continue;
704                         }
705                 }
706
707                 /*
708                  * To change the association of a valid buffer, we'll need to have
709                  * exclusive lock on both the old and new mapping partitions.
710                  */
711                 if (oldFlags & BM_TAG_VALID)
712                 {
713                         /*
714                          * Need to compute the old tag's hashcode and partition lock ID.
715                          * XXX is it worth storing the hashcode in BufferDesc so we need
716                          * not recompute it here?  Probably not.
717                          */
718                         oldTag = buf->tag;
719                         oldHash = BufTableHashCode(&oldTag);
720                         oldPartitionLock = BufMappingPartitionLock(oldHash);
721
722                         /*
723                          * Must lock the lower-numbered partition first to avoid
724                          * deadlocks.
725                          */
726                         if (oldPartitionLock < newPartitionLock)
727                         {
728                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
729                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
730                         }
731                         else if (oldPartitionLock > newPartitionLock)
732                         {
733                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
734                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
735                         }
736                         else
737                         {
738                                 /* only one partition, only one lock */
739                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
740                         }
741                 }
742                 else
743                 {
744                         /* if it wasn't valid, we need only the new partition */
745                         LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
746                         /* these just keep the compiler quiet about uninit variables */
747                         oldHash = 0;
748                         oldPartitionLock = 0;
749                 }
750
751                 /*
752                  * Try to make a hashtable entry for the buffer under its new tag.
753                  * This could fail because while we were writing someone else
754                  * allocated another buffer for the same block we want to read in.
755                  * Note that we have not yet removed the hashtable entry for the old
756                  * tag.
757                  */
758                 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
759
760                 if (buf_id >= 0)
761                 {
762                         /*
763                          * Got a collision. Someone has already done what we were about to
764                          * do. We'll just handle this as if it were found in the buffer
765                          * pool in the first place.  First, give up the buffer we were
766                          * planning to use.
767                          */
768                         UnpinBuffer(buf, true);
769
770                         /* Can give up that buffer's mapping partition lock now */
771                         if ((oldFlags & BM_TAG_VALID) &&
772                                 oldPartitionLock != newPartitionLock)
773                                 LWLockRelease(oldPartitionLock);
774
775                         /* remaining code should match code at top of routine */
776
777                         buf = &BufferDescriptors[buf_id];
778
779                         valid = PinBuffer(buf, strategy);
780
781                         /* Can release the mapping lock as soon as we've pinned it */
782                         LWLockRelease(newPartitionLock);
783
784                         *foundPtr = TRUE;
785
786                         if (!valid)
787                         {
788                                 /*
789                                  * We can only get here if (a) someone else is still reading
790                                  * in the page, or (b) a previous read attempt failed.  We
791                                  * have to wait for any active read attempt to finish, and
792                                  * then set up our own read attempt if the page is still not
793                                  * BM_VALID.  StartBufferIO does it all.
794                                  */
795                                 if (StartBufferIO(buf, true))
796                                 {
797                                         /*
798                                          * If we get here, previous attempts to read the buffer
799                                          * must have failed ... but we shall bravely try again.
800                                          */
801                                         *foundPtr = FALSE;
802                                 }
803                         }
804
805                         return buf;
806                 }
807
808                 /*
809                  * Need to lock the buffer header too in order to change its tag.
810                  */
811                 LockBufHdr(buf);
812
813                 /*
814                  * Somebody could have pinned or re-dirtied the buffer while we were
815                  * doing the I/O and making the new hashtable entry.  If so, we can't
816                  * recycle this buffer; we must undo everything we've done and start
817                  * over with a new victim buffer.
818                  */
819                 oldFlags = buf->flags;
820                 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
821                         break;
822
823                 UnlockBufHdr(buf);
824                 BufTableDelete(&newTag, newHash);
825                 if ((oldFlags & BM_TAG_VALID) &&
826                         oldPartitionLock != newPartitionLock)
827                         LWLockRelease(oldPartitionLock);
828                 LWLockRelease(newPartitionLock);
829                 UnpinBuffer(buf, true);
830         }
831
832         /*
833          * Okay, it's finally safe to rename the buffer.
834          *
835          * Clearing BM_VALID here is necessary, clearing the dirtybits is just
836          * paranoia.  We also reset the usage_count since any recency of use of
837          * the old content is no longer relevant.  (The usage_count starts out at
838          * 1 so that the buffer can survive one clock-sweep pass.)
839          */
840         buf->tag = newTag;
841         buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
842         if (relpersistence == RELPERSISTENCE_PERMANENT)
843                 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
844         else
845                 buf->flags |= BM_TAG_VALID;
846         buf->usage_count = 1;
847
848         UnlockBufHdr(buf);
849
850         if (oldFlags & BM_TAG_VALID)
851         {
852                 BufTableDelete(&oldTag, oldHash);
853                 if (oldPartitionLock != newPartitionLock)
854                         LWLockRelease(oldPartitionLock);
855         }
856
857         LWLockRelease(newPartitionLock);
858
859         /*
860          * Buffer contents are currently invalid.  Try to get the io_in_progress
861          * lock.  If StartBufferIO returns false, then someone else managed to
862          * read it before we did, so there's nothing left for BufferAlloc() to do.
863          */
864         if (StartBufferIO(buf, true))
865                 *foundPtr = FALSE;
866         else
867                 *foundPtr = TRUE;
868
869         return buf;
870 }
871
872 /*
873  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
874  * freelist.
875  *
876  * The buffer header spinlock must be held at entry.  We drop it before
877  * returning.  (This is sane because the caller must have locked the
878  * buffer in order to be sure it should be dropped.)
879  *
880  * This is used only in contexts such as dropping a relation.  We assume
881  * that no other backend could possibly be interested in using the page,
882  * so the only reason the buffer might be pinned is if someone else is
883  * trying to write it out.  We have to let them finish before we can
884  * reclaim the buffer.
885  *
886  * The buffer could get reclaimed by someone else while we are waiting
887  * to acquire the necessary locks; if so, don't mess it up.
888  */
889 static void
890 InvalidateBuffer(volatile BufferDesc *buf)
891 {
892         BufferTag       oldTag;
893         uint32          oldHash;                /* hash value for oldTag */
894         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
895         BufFlags        oldFlags;
896
897         /* Save the original buffer tag before dropping the spinlock */
898         oldTag = buf->tag;
899
900         UnlockBufHdr(buf);
901
902         /*
903          * Need to compute the old tag's hashcode and partition lock ID. XXX is it
904          * worth storing the hashcode in BufferDesc so we need not recompute it
905          * here?  Probably not.
906          */
907         oldHash = BufTableHashCode(&oldTag);
908         oldPartitionLock = BufMappingPartitionLock(oldHash);
909
910 retry:
911
912         /*
913          * Acquire exclusive mapping lock in preparation for changing the buffer's
914          * association.
915          */
916         LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
917
918         /* Re-lock the buffer header */
919         LockBufHdr(buf);
920
921         /* If it's changed while we were waiting for lock, do nothing */
922         if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
923         {
924                 UnlockBufHdr(buf);
925                 LWLockRelease(oldPartitionLock);
926                 return;
927         }
928
929         /*
930          * We assume the only reason for it to be pinned is that someone else is
931          * flushing the page out.  Wait for them to finish.  (This could be an
932          * infinite loop if the refcount is messed up... it would be nice to time
933          * out after awhile, but there seems no way to be sure how many loops may
934          * be needed.  Note that if the other guy has pinned the buffer but not
935          * yet done StartBufferIO, WaitIO will fall through and we'll effectively
936          * be busy-looping here.)
937          */
938         if (buf->refcount != 0)
939         {
940                 UnlockBufHdr(buf);
941                 LWLockRelease(oldPartitionLock);
942                 /* safety check: should definitely not be our *own* pin */
943                 if (PrivateRefCount[buf->buf_id] != 0)
944                         elog(ERROR, "buffer is pinned in InvalidateBuffer");
945                 WaitIO(buf);
946                 goto retry;
947         }
948
949         /*
950          * Clear out the buffer's tag and flags.  We must do this to ensure that
951          * linear scans of the buffer array don't think the buffer is valid.
952          */
953         oldFlags = buf->flags;
954         CLEAR_BUFFERTAG(buf->tag);
955         buf->flags = 0;
956         buf->usage_count = 0;
957
958         UnlockBufHdr(buf);
959
960         /*
961          * Remove the buffer from the lookup hashtable, if it was in there.
962          */
963         if (oldFlags & BM_TAG_VALID)
964                 BufTableDelete(&oldTag, oldHash);
965
966         /*
967          * Done with mapping lock.
968          */
969         LWLockRelease(oldPartitionLock);
970
971         /*
972          * Insert the buffer at the head of the list of free buffers.
973          */
974         StrategyFreeBuffer(buf);
975 }
976
977 /*
978  * MarkBufferDirty
979  *
980  *              Marks buffer contents as dirty (actual write happens later).
981  *
982  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
983  * exclusive lock, then somebody could be in process of writing the buffer,
984  * leading to risk of bad data written to disk.)
985  */
986 void
987 MarkBufferDirty(Buffer buffer)
988 {
989         volatile BufferDesc *bufHdr;
990
991         if (!BufferIsValid(buffer))
992                 elog(ERROR, "bad buffer ID: %d", buffer);
993
994         if (BufferIsLocal(buffer))
995         {
996                 MarkLocalBufferDirty(buffer);
997                 return;
998         }
999
1000         bufHdr = &BufferDescriptors[buffer - 1];
1001
1002         Assert(PrivateRefCount[buffer - 1] > 0);
1003         /* unfortunately we can't check if the lock is held exclusively */
1004         Assert(LWLockHeldByMe(bufHdr->content_lock));
1005
1006         LockBufHdr(bufHdr);
1007
1008         Assert(bufHdr->refcount > 0);
1009
1010         /*
1011          * If the buffer was not dirty already, do vacuum accounting.
1012          */
1013         if (!(bufHdr->flags & BM_DIRTY))
1014         {
1015                 VacuumPageDirty++;
1016                 pgBufferUsage.shared_blks_dirtied++;
1017                 if (VacuumCostActive)
1018                         VacuumCostBalance += VacuumCostPageDirty;
1019         }
1020
1021         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1022
1023         UnlockBufHdr(bufHdr);
1024 }
1025
1026 /*
1027  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1028  *
1029  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1030  * compared to calling the two routines separately.  Now it's mainly just
1031  * a convenience function.  However, if the passed buffer is valid and
1032  * already contains the desired block, we just return it as-is; and that
1033  * does save considerable work compared to a full release and reacquire.
1034  *
1035  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1036  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1037  * but can save some tests in the caller.
1038  */
1039 Buffer
1040 ReleaseAndReadBuffer(Buffer buffer,
1041                                          Relation relation,
1042                                          BlockNumber blockNum)
1043 {
1044         ForkNumber      forkNum = MAIN_FORKNUM;
1045         volatile BufferDesc *bufHdr;
1046
1047         if (BufferIsValid(buffer))
1048         {
1049                 if (BufferIsLocal(buffer))
1050                 {
1051                         Assert(LocalRefCount[-buffer - 1] > 0);
1052                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
1053                         if (bufHdr->tag.blockNum == blockNum &&
1054                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1055                                 bufHdr->tag.forkNum == forkNum)
1056                                 return buffer;
1057                         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1058                         LocalRefCount[-buffer - 1]--;
1059                 }
1060                 else
1061                 {
1062                         Assert(PrivateRefCount[buffer - 1] > 0);
1063                         bufHdr = &BufferDescriptors[buffer - 1];
1064                         /* we have pin, so it's ok to examine tag without spinlock */
1065                         if (bufHdr->tag.blockNum == blockNum &&
1066                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1067                                 bufHdr->tag.forkNum == forkNum)
1068                                 return buffer;
1069                         UnpinBuffer(bufHdr, true);
1070                 }
1071         }
1072
1073         return ReadBuffer(relation, blockNum);
1074 }
1075
1076 /*
1077  * PinBuffer -- make buffer unavailable for replacement.
1078  *
1079  * For the default access strategy, the buffer's usage_count is incremented
1080  * when we first pin it; for other strategies we just make sure the usage_count
1081  * isn't zero.  (The idea of the latter is that we don't want synchronized
1082  * heap scans to inflate the count, but we need it to not be zero to discourage
1083  * other backends from stealing buffers from our ring.  As long as we cycle
1084  * through the ring faster than the global clock-sweep cycles, buffers in
1085  * our ring won't be chosen as victims for replacement by other backends.)
1086  *
1087  * This should be applied only to shared buffers, never local ones.
1088  *
1089  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1090  *
1091  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
1092  * some callers to avoid an extra spinlock cycle.
1093  */
1094 static bool
1095 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
1096 {
1097         int                     b = buf->buf_id;
1098         bool            result;
1099
1100         if (PrivateRefCount[b] == 0)
1101         {
1102                 LockBufHdr(buf);
1103                 buf->refcount++;
1104                 if (strategy == NULL)
1105                 {
1106                         if (buf->usage_count < BM_MAX_USAGE_COUNT)
1107                                 buf->usage_count++;
1108                 }
1109                 else
1110                 {
1111                         if (buf->usage_count == 0)
1112                                 buf->usage_count = 1;
1113                 }
1114                 result = (buf->flags & BM_VALID) != 0;
1115                 UnlockBufHdr(buf);
1116         }
1117         else
1118         {
1119                 /* If we previously pinned the buffer, it must surely be valid */
1120                 result = true;
1121         }
1122         PrivateRefCount[b]++;
1123         Assert(PrivateRefCount[b] > 0);
1124         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1125                                                                 BufferDescriptorGetBuffer(buf));
1126         return result;
1127 }
1128
1129 /*
1130  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1131  * The spinlock is released before return.
1132  *
1133  * Currently, no callers of this function want to modify the buffer's
1134  * usage_count at all, so there's no need for a strategy parameter.
1135  * Also we don't bother with a BM_VALID test (the caller could check that for
1136  * itself).
1137  *
1138  * Note: use of this routine is frequently mandatory, not just an optimization
1139  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1140  * its state can change under us.
1141  */
1142 static void
1143 PinBuffer_Locked(volatile BufferDesc *buf)
1144 {
1145         int                     b = buf->buf_id;
1146
1147         if (PrivateRefCount[b] == 0)
1148                 buf->refcount++;
1149         UnlockBufHdr(buf);
1150         PrivateRefCount[b]++;
1151         Assert(PrivateRefCount[b] > 0);
1152         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1153                                                                 BufferDescriptorGetBuffer(buf));
1154 }
1155
1156 /*
1157  * UnpinBuffer -- make buffer available for replacement.
1158  *
1159  * This should be applied only to shared buffers, never local ones.
1160  *
1161  * Most but not all callers want CurrentResourceOwner to be adjusted.
1162  * Those that don't should pass fixOwner = FALSE.
1163  */
1164 static void
1165 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
1166 {
1167         int                     b = buf->buf_id;
1168
1169         if (fixOwner)
1170                 ResourceOwnerForgetBuffer(CurrentResourceOwner,
1171                                                                   BufferDescriptorGetBuffer(buf));
1172
1173         Assert(PrivateRefCount[b] > 0);
1174         PrivateRefCount[b]--;
1175         if (PrivateRefCount[b] == 0)
1176         {
1177                 /* I'd better not still hold any locks on the buffer */
1178                 Assert(!LWLockHeldByMe(buf->content_lock));
1179                 Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
1180
1181                 LockBufHdr(buf);
1182
1183                 /* Decrement the shared reference count */
1184                 Assert(buf->refcount > 0);
1185                 buf->refcount--;
1186
1187                 /* Support LockBufferForCleanup() */
1188                 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
1189                         buf->refcount == 1)
1190                 {
1191                         /* we just released the last pin other than the waiter's */
1192                         int                     wait_backend_pid = buf->wait_backend_pid;
1193
1194                         buf->flags &= ~BM_PIN_COUNT_WAITER;
1195                         UnlockBufHdr(buf);
1196                         ProcSendSignal(wait_backend_pid);
1197                 }
1198                 else
1199                         UnlockBufHdr(buf);
1200         }
1201 }
1202
1203 /*
1204  * BufferSync -- Write out all dirty buffers in the pool.
1205  *
1206  * This is called at checkpoint time to write out all dirty shared buffers.
1207  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1208  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN is
1209  * set, we write even unlogged buffers, which are otherwise skipped.  The
1210  * remaining flags currently have no effect here.
1211  */
1212 static void
1213 BufferSync(int flags)
1214 {
1215         int                     buf_id;
1216         int                     num_to_scan;
1217         int                     num_to_write;
1218         int                     num_written;
1219         int                     mask = BM_DIRTY;
1220
1221         /* Make sure we can handle the pin inside SyncOneBuffer */
1222         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1223
1224         /*
1225          * Unless this is a shutdown checkpoint, we write only permanent, dirty
1226          * buffers.  But at shutdown or end of recovery, we write all dirty
1227          * buffers.
1228          */
1229         if (!((flags & CHECKPOINT_IS_SHUTDOWN) || (flags & CHECKPOINT_END_OF_RECOVERY)))
1230                 mask |= BM_PERMANENT;
1231
1232         /*
1233          * Loop over all buffers, and mark the ones that need to be written with
1234          * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
1235          * can estimate how much work needs to be done.
1236          *
1237          * This allows us to write only those pages that were dirty when the
1238          * checkpoint began, and not those that get dirtied while it proceeds.
1239          * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1240          * later in this function, or by normal backends or the bgwriter cleaning
1241          * scan, the flag is cleared.  Any buffer dirtied after this point won't
1242          * have the flag set.
1243          *
1244          * Note that if we fail to write some buffer, we may leave buffers with
1245          * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
1246          * certainly need to be written for the next checkpoint attempt, too.
1247          */
1248         num_to_write = 0;
1249         for (buf_id = 0; buf_id < NBuffers; buf_id++)
1250         {
1251                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1252
1253                 /*
1254                  * Header spinlock is enough to examine BM_DIRTY, see comment in
1255                  * SyncOneBuffer.
1256                  */
1257                 LockBufHdr(bufHdr);
1258
1259                 if ((bufHdr->flags & mask) == mask)
1260                 {
1261                         bufHdr->flags |= BM_CHECKPOINT_NEEDED;
1262                         num_to_write++;
1263                 }
1264
1265                 UnlockBufHdr(bufHdr);
1266         }
1267
1268         if (num_to_write == 0)
1269                 return;                                 /* nothing to do */
1270
1271         TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
1272
1273         /*
1274          * Loop over all buffers again, and write the ones (still) marked with
1275          * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
1276          * since we might as well dump soon-to-be-recycled buffers first.
1277          *
1278          * Note that we don't read the buffer alloc count here --- that should be
1279          * left untouched till the next BgBufferSync() call.
1280          */
1281         buf_id = StrategySyncStart(NULL, NULL);
1282         num_to_scan = NBuffers;
1283         num_written = 0;
1284         while (num_to_scan-- > 0)
1285         {
1286                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1287
1288                 /*
1289                  * We don't need to acquire the lock here, because we're only looking
1290                  * at a single bit. It's possible that someone else writes the buffer
1291                  * and clears the flag right after we check, but that doesn't matter
1292                  * since SyncOneBuffer will then do nothing.  However, there is a
1293                  * further race condition: it's conceivable that between the time we
1294                  * examine the bit here and the time SyncOneBuffer acquires lock,
1295                  * someone else not only wrote the buffer but replaced it with another
1296                  * page and dirtied it.  In that improbable case, SyncOneBuffer will
1297                  * write the buffer though we didn't need to.  It doesn't seem worth
1298                  * guarding against this, though.
1299                  */
1300                 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
1301                 {
1302                         if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
1303                         {
1304                                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1305                                 BgWriterStats.m_buf_written_checkpoints++;
1306                                 num_written++;
1307
1308                                 /*
1309                                  * We know there are at most num_to_write buffers with
1310                                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
1311                                  * num_written reaches num_to_write.
1312                                  *
1313                                  * Note that num_written doesn't include buffers written by
1314                                  * other backends, or by the bgwriter cleaning scan. That
1315                                  * means that the estimate of how much progress we've made is
1316                                  * conservative, and also that this test will often fail to
1317                                  * trigger.  But it seems worth making anyway.
1318                                  */
1319                                 if (num_written >= num_to_write)
1320                                         break;
1321
1322                                 /*
1323                                  * Sleep to throttle our I/O rate.
1324                                  */
1325                                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
1326                         }
1327                 }
1328
1329                 if (++buf_id >= NBuffers)
1330                         buf_id = 0;
1331         }
1332
1333         /*
1334          * Update checkpoint statistics. As noted above, this doesn't include
1335          * buffers written by other backends or bgwriter scan.
1336          */
1337         CheckpointStats.ckpt_bufs_written += num_written;
1338
1339         TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
1340 }
1341
1342 /*
1343  * BgBufferSync -- Write out some dirty buffers in the pool.
1344  *
1345  * This is called periodically by the background writer process.
1346  *
1347  * Returns true if it's appropriate for the bgwriter process to go into
1348  * low-power hibernation mode.  (This happens if the strategy clock sweep
1349  * has been "lapped" and no buffer allocations have occurred recently,
1350  * or if the bgwriter has been effectively disabled by setting
1351  * bgwriter_lru_maxpages to 0.)
1352  */
1353 bool
1354 BgBufferSync(void)
1355 {
1356         /* info obtained from freelist.c */
1357         int                     strategy_buf_id;
1358         uint32          strategy_passes;
1359         uint32          recent_alloc;
1360
1361         /*
1362          * Information saved between calls so we can determine the strategy
1363          * point's advance rate and avoid scanning already-cleaned buffers.
1364          */
1365         static bool saved_info_valid = false;
1366         static int      prev_strategy_buf_id;
1367         static uint32 prev_strategy_passes;
1368         static int      next_to_clean;
1369         static uint32 next_passes;
1370
1371         /* Moving averages of allocation rate and clean-buffer density */
1372         static float smoothed_alloc = 0;
1373         static float smoothed_density = 10.0;
1374
1375         /* Potentially these could be tunables, but for now, not */
1376         float           smoothing_samples = 16;
1377         float           scan_whole_pool_milliseconds = 120000.0;
1378
1379         /* Used to compute how far we scan ahead */
1380         long            strategy_delta;
1381         int                     bufs_to_lap;
1382         int                     bufs_ahead;
1383         float           scans_per_alloc;
1384         int                     reusable_buffers_est;
1385         int                     upcoming_alloc_est;
1386         int                     min_scan_buffers;
1387
1388         /* Variables for the scanning loop proper */
1389         int                     num_to_scan;
1390         int                     num_written;
1391         int                     reusable_buffers;
1392
1393         /* Variables for final smoothed_density update */
1394         long            new_strategy_delta;
1395         uint32          new_recent_alloc;
1396
1397         /*
1398          * Find out where the freelist clock sweep currently is, and how many
1399          * buffer allocations have happened since our last call.
1400          */
1401         strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
1402
1403         /* Report buffer alloc counts to pgstat */
1404         BgWriterStats.m_buf_alloc += recent_alloc;
1405
1406         /*
1407          * If we're not running the LRU scan, just stop after doing the stats
1408          * stuff.  We mark the saved state invalid so that we can recover sanely
1409          * if LRU scan is turned back on later.
1410          */
1411         if (bgwriter_lru_maxpages <= 0)
1412         {
1413                 saved_info_valid = false;
1414                 return true;
1415         }
1416
1417         /*
1418          * Compute strategy_delta = how many buffers have been scanned by the
1419          * clock sweep since last time.  If first time through, assume none. Then
1420          * see if we are still ahead of the clock sweep, and if so, how many
1421          * buffers we could scan before we'd catch up with it and "lap" it. Note:
1422          * weird-looking coding of xxx_passes comparisons are to avoid bogus
1423          * behavior when the passes counts wrap around.
1424          */
1425         if (saved_info_valid)
1426         {
1427                 int32           passes_delta = strategy_passes - prev_strategy_passes;
1428
1429                 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
1430                 strategy_delta += (long) passes_delta *NBuffers;
1431
1432                 Assert(strategy_delta >= 0);
1433
1434                 if ((int32) (next_passes - strategy_passes) > 0)
1435                 {
1436                         /* we're one pass ahead of the strategy point */
1437                         bufs_to_lap = strategy_buf_id - next_to_clean;
1438 #ifdef BGW_DEBUG
1439                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1440                                  next_passes, next_to_clean,
1441                                  strategy_passes, strategy_buf_id,
1442                                  strategy_delta, bufs_to_lap);
1443 #endif
1444                 }
1445                 else if (next_passes == strategy_passes &&
1446                                  next_to_clean >= strategy_buf_id)
1447                 {
1448                         /* on same pass, but ahead or at least not behind */
1449                         bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
1450 #ifdef BGW_DEBUG
1451                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1452                                  next_passes, next_to_clean,
1453                                  strategy_passes, strategy_buf_id,
1454                                  strategy_delta, bufs_to_lap);
1455 #endif
1456                 }
1457                 else
1458                 {
1459                         /*
1460                          * We're behind, so skip forward to the strategy point and start
1461                          * cleaning from there.
1462                          */
1463 #ifdef BGW_DEBUG
1464                         elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
1465                                  next_passes, next_to_clean,
1466                                  strategy_passes, strategy_buf_id,
1467                                  strategy_delta);
1468 #endif
1469                         next_to_clean = strategy_buf_id;
1470                         next_passes = strategy_passes;
1471                         bufs_to_lap = NBuffers;
1472                 }
1473         }
1474         else
1475         {
1476                 /*
1477                  * Initializing at startup or after LRU scanning had been off. Always
1478                  * start at the strategy point.
1479                  */
1480 #ifdef BGW_DEBUG
1481                 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
1482                          strategy_passes, strategy_buf_id);
1483 #endif
1484                 strategy_delta = 0;
1485                 next_to_clean = strategy_buf_id;
1486                 next_passes = strategy_passes;
1487                 bufs_to_lap = NBuffers;
1488         }
1489
1490         /* Update saved info for next time */
1491         prev_strategy_buf_id = strategy_buf_id;
1492         prev_strategy_passes = strategy_passes;
1493         saved_info_valid = true;
1494
1495         /*
1496          * Compute how many buffers had to be scanned for each new allocation, ie,
1497          * 1/density of reusable buffers, and track a moving average of that.
1498          *
1499          * If the strategy point didn't move, we don't update the density estimate
1500          */
1501         if (strategy_delta > 0 && recent_alloc > 0)
1502         {
1503                 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
1504                 smoothed_density += (scans_per_alloc - smoothed_density) /
1505                         smoothing_samples;
1506         }
1507
1508         /*
1509          * Estimate how many reusable buffers there are between the current
1510          * strategy point and where we've scanned ahead to, based on the smoothed
1511          * density estimate.
1512          */
1513         bufs_ahead = NBuffers - bufs_to_lap;
1514         reusable_buffers_est = (float) bufs_ahead / smoothed_density;
1515
1516         /*
1517          * Track a moving average of recent buffer allocations.  Here, rather than
1518          * a true average we want a fast-attack, slow-decline behavior: we
1519          * immediately follow any increase.
1520          */
1521         if (smoothed_alloc <= (float) recent_alloc)
1522                 smoothed_alloc = recent_alloc;
1523         else
1524                 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
1525                         smoothing_samples;
1526
1527         /* Scale the estimate by a GUC to allow more aggressive tuning. */
1528         upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
1529
1530         /*
1531          * If recent_alloc remains at zero for many cycles, smoothed_alloc will
1532          * eventually underflow to zero, and the underflows produce annoying
1533          * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
1534          * zero, there's no point in tracking smaller and smaller values of
1535          * smoothed_alloc, so just reset it to exactly zero to avoid this
1536          * syndrome.  It will pop back up as soon as recent_alloc increases.
1537          */
1538         if (upcoming_alloc_est == 0)
1539                 smoothed_alloc = 0;
1540
1541         /*
1542          * Even in cases where there's been little or no buffer allocation
1543          * activity, we want to make a small amount of progress through the buffer
1544          * cache so that as many reusable buffers as possible are clean after an
1545          * idle period.
1546          *
1547          * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
1548          * the BGW will be called during the scan_whole_pool time; slice the
1549          * buffer pool into that many sections.
1550          */
1551         min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
1552
1553         if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
1554         {
1555 #ifdef BGW_DEBUG
1556                 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
1557                          upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
1558 #endif
1559                 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
1560         }
1561
1562         /*
1563          * Now write out dirty reusable buffers, working forward from the
1564          * next_to_clean point, until we have lapped the strategy scan, or cleaned
1565          * enough buffers to match our estimate of the next cycle's allocation
1566          * requirements, or hit the bgwriter_lru_maxpages limit.
1567          */
1568
1569         /* Make sure we can handle the pin inside SyncOneBuffer */
1570         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1571
1572         num_to_scan = bufs_to_lap;
1573         num_written = 0;
1574         reusable_buffers = reusable_buffers_est;
1575
1576         /* Execute the LRU scan */
1577         while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
1578         {
1579                 int                     buffer_state = SyncOneBuffer(next_to_clean, true);
1580
1581                 if (++next_to_clean >= NBuffers)
1582                 {
1583                         next_to_clean = 0;
1584                         next_passes++;
1585                 }
1586                 num_to_scan--;
1587
1588                 if (buffer_state & BUF_WRITTEN)
1589                 {
1590                         reusable_buffers++;
1591                         if (++num_written >= bgwriter_lru_maxpages)
1592                         {
1593                                 BgWriterStats.m_maxwritten_clean++;
1594                                 break;
1595                         }
1596                 }
1597                 else if (buffer_state & BUF_REUSABLE)
1598                         reusable_buffers++;
1599         }
1600
1601         BgWriterStats.m_buf_written_clean += num_written;
1602
1603 #ifdef BGW_DEBUG
1604         elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
1605                  recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
1606                  smoothed_density, reusable_buffers_est, upcoming_alloc_est,
1607                  bufs_to_lap - num_to_scan,
1608                  num_written,
1609                  reusable_buffers - reusable_buffers_est);
1610 #endif
1611
1612         /*
1613          * Consider the above scan as being like a new allocation scan.
1614          * Characterize its density and update the smoothed one based on it. This
1615          * effectively halves the moving average period in cases where both the
1616          * strategy and the background writer are doing some useful scanning,
1617          * which is helpful because a long memory isn't as desirable on the
1618          * density estimates.
1619          */
1620         new_strategy_delta = bufs_to_lap - num_to_scan;
1621         new_recent_alloc = reusable_buffers - reusable_buffers_est;
1622         if (new_strategy_delta > 0 && new_recent_alloc > 0)
1623         {
1624                 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
1625                 smoothed_density += (scans_per_alloc - smoothed_density) /
1626                         smoothing_samples;
1627
1628 #ifdef BGW_DEBUG
1629                 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
1630                          new_recent_alloc, new_strategy_delta,
1631                          scans_per_alloc, smoothed_density);
1632 #endif
1633         }
1634
1635         /* Return true if OK to hibernate */
1636         return (bufs_to_lap == 0 && recent_alloc == 0);
1637 }
1638
1639 /*
1640  * SyncOneBuffer -- process a single buffer during syncing.
1641  *
1642  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
1643  * buffers marked recently used, as these are not replacement candidates.
1644  *
1645  * Returns a bitmask containing the following flag bits:
1646  *      BUF_WRITTEN: we wrote the buffer.
1647  *      BUF_REUSABLE: buffer is available for replacement, ie, it has
1648  *              pin count 0 and usage count 0.
1649  *
1650  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
1651  * after locking it, but we don't care all that much.)
1652  *
1653  * Note: caller must have done ResourceOwnerEnlargeBuffers.
1654  */
1655 static int
1656 SyncOneBuffer(int buf_id, bool skip_recently_used)
1657 {
1658         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1659         int                     result = 0;
1660
1661         /*
1662          * Check whether buffer needs writing.
1663          *
1664          * We can make this check without taking the buffer content lock so long
1665          * as we mark pages dirty in access methods *before* logging changes with
1666          * XLogInsert(): if someone marks the buffer dirty just after our check we
1667          * don't worry because our checkpoint.redo points before log record for
1668          * upcoming changes and so we are not required to write such dirty buffer.
1669          */
1670         LockBufHdr(bufHdr);
1671
1672         if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
1673                 result |= BUF_REUSABLE;
1674         else if (skip_recently_used)
1675         {
1676                 /* Caller told us not to write recently-used buffers */
1677                 UnlockBufHdr(bufHdr);
1678                 return result;
1679         }
1680
1681         if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
1682         {
1683                 /* It's clean, so nothing to do */
1684                 UnlockBufHdr(bufHdr);
1685                 return result;
1686         }
1687
1688         /*
1689          * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
1690          * buffer is clean by the time we've locked it.)
1691          */
1692         PinBuffer_Locked(bufHdr);
1693         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
1694
1695         FlushBuffer(bufHdr, NULL);
1696
1697         LWLockRelease(bufHdr->content_lock);
1698         UnpinBuffer(bufHdr, true);
1699
1700         return result | BUF_WRITTEN;
1701 }
1702
1703 /*
1704  *              AtEOXact_Buffers - clean up at end of transaction.
1705  */
1706 void
1707 AtEOXact_Buffers(bool isCommit)
1708 {
1709         CheckForBufferLeaks();
1710
1711         AtEOXact_LocalBuffers(isCommit);
1712 }
1713
1714 /*
1715  * InitBufferPoolBackend --- second-stage initialization of a new backend
1716  *
1717  * This is called after we have acquired a PGPROC and so can safely get
1718  * LWLocks.  We don't currently need to do anything at this stage ...
1719  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
1720  * access, and thereby has to be called at the corresponding phase of
1721  * backend shutdown.
1722  */
1723 void
1724 InitBufferPoolBackend(void)
1725 {
1726         on_shmem_exit(AtProcExit_Buffers, 0);
1727 }
1728
1729 /*
1730  * During backend exit, ensure that we released all shared-buffer locks and
1731  * assert that we have no remaining pins.
1732  */
1733 static void
1734 AtProcExit_Buffers(int code, Datum arg)
1735 {
1736         AbortBufferIO();
1737         UnlockBuffers();
1738
1739         CheckForBufferLeaks();
1740
1741         /* localbuf.c needs a chance too */
1742         AtProcExit_LocalBuffers();
1743 }
1744
1745 /*
1746  *              CheckForBufferLeaks - ensure this backend holds no buffer pins
1747  *
1748  *              As of PostgreSQL 8.0, buffer pins should get released by the
1749  *              ResourceOwner mechanism.  This routine is just a debugging
1750  *              cross-check that no pins remain.
1751  */
1752 static void
1753 CheckForBufferLeaks(void)
1754 {
1755 #ifdef USE_ASSERT_CHECKING
1756         int                     RefCountErrors = 0;
1757         Buffer          b;
1758
1759         for (b = 1; b <= NBuffers; b++)
1760         {
1761                 if (PrivateRefCount[b - 1] != 0)
1762                 {
1763                         PrintBufferLeakWarning(b);
1764                         RefCountErrors++;
1765                 }
1766         }
1767         Assert(RefCountErrors == 0);
1768 #endif
1769 }
1770
1771 /*
1772  * Helper routine to issue warnings when a buffer is unexpectedly pinned
1773  */
1774 void
1775 PrintBufferLeakWarning(Buffer buffer)
1776 {
1777         volatile BufferDesc *buf;
1778         int32           loccount;
1779         char       *path;
1780         BackendId       backend;
1781
1782         Assert(BufferIsValid(buffer));
1783         if (BufferIsLocal(buffer))
1784         {
1785                 buf = &LocalBufferDescriptors[-buffer - 1];
1786                 loccount = LocalRefCount[-buffer - 1];
1787                 backend = MyBackendId;
1788         }
1789         else
1790         {
1791                 buf = &BufferDescriptors[buffer - 1];
1792                 loccount = PrivateRefCount[buffer - 1];
1793                 backend = InvalidBackendId;
1794         }
1795
1796         /* theoretically we should lock the bufhdr here */
1797         path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
1798         elog(WARNING,
1799                  "buffer refcount leak: [%03d] "
1800                  "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
1801                  buffer, path,
1802                  buf->tag.blockNum, buf->flags,
1803                  buf->refcount, loccount);
1804         pfree(path);
1805 }
1806
1807 /*
1808  * CheckPointBuffers
1809  *
1810  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
1811  *
1812  * Note: temporary relations do not participate in checkpoints, so they don't
1813  * need to be flushed.
1814  */
1815 void
1816 CheckPointBuffers(int flags)
1817 {
1818         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
1819         CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
1820         BufferSync(flags);
1821         CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
1822         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
1823         smgrsync();
1824         CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
1825         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
1826 }
1827
1828
1829 /*
1830  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
1831  */
1832 void
1833 BufmgrCommit(void)
1834 {
1835         /* Nothing to do in bufmgr anymore... */
1836 }
1837
1838 /*
1839  * BufferGetBlockNumber
1840  *              Returns the block number associated with a buffer.
1841  *
1842  * Note:
1843  *              Assumes that the buffer is valid and pinned, else the
1844  *              value may be obsolete immediately...
1845  */
1846 BlockNumber
1847 BufferGetBlockNumber(Buffer buffer)
1848 {
1849         volatile BufferDesc *bufHdr;
1850
1851         Assert(BufferIsPinned(buffer));
1852
1853         if (BufferIsLocal(buffer))
1854                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1855         else
1856                 bufHdr = &BufferDescriptors[buffer - 1];
1857
1858         /* pinned, so OK to read tag without spinlock */
1859         return bufHdr->tag.blockNum;
1860 }
1861
1862 /*
1863  * BufferGetTag
1864  *              Returns the relfilenode, fork number and block number associated with
1865  *              a buffer.
1866  */
1867 void
1868 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
1869                          BlockNumber *blknum)
1870 {
1871         volatile BufferDesc *bufHdr;
1872
1873         /* Do the same checks as BufferGetBlockNumber. */
1874         Assert(BufferIsPinned(buffer));
1875
1876         if (BufferIsLocal(buffer))
1877                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1878         else
1879                 bufHdr = &BufferDescriptors[buffer - 1];
1880
1881         /* pinned, so OK to read tag without spinlock */
1882         *rnode = bufHdr->tag.rnode;
1883         *forknum = bufHdr->tag.forkNum;
1884         *blknum = bufHdr->tag.blockNum;
1885 }
1886
1887 /*
1888  * FlushBuffer
1889  *              Physically write out a shared buffer.
1890  *
1891  * NOTE: this actually just passes the buffer contents to the kernel; the
1892  * real write to disk won't happen until the kernel feels like it.  This
1893  * is okay from our point of view since we can redo the changes from WAL.
1894  * However, we will need to force the changes to disk via fsync before
1895  * we can checkpoint WAL.
1896  *
1897  * The caller must hold a pin on the buffer and have share-locked the
1898  * buffer contents.  (Note: a share-lock does not prevent updates of
1899  * hint bits in the buffer, so the page could change while the write
1900  * is in progress, but we assume that that will not invalidate the data
1901  * written.)
1902  *
1903  * If the caller has an smgr reference for the buffer's relation, pass it
1904  * as the second parameter.  If not, pass NULL.
1905  */
1906 static void
1907 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1908 {
1909         XLogRecPtr      recptr;
1910         ErrorContextCallback errcallback;
1911         instr_time      io_start,
1912                                 io_time;
1913         Block           bufBlock;
1914         char       *bufToWrite;
1915
1916         /*
1917          * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
1918          * false, then someone else flushed the buffer before we could, so we need
1919          * not do anything.
1920          */
1921         if (!StartBufferIO(buf, false))
1922                 return;
1923
1924         /* Setup error traceback support for ereport() */
1925         errcallback.callback = shared_buffer_write_error_callback;
1926         errcallback.arg = (void *) buf;
1927         errcallback.previous = error_context_stack;
1928         error_context_stack = &errcallback;
1929
1930         /* Find smgr relation for buffer */
1931         if (reln == NULL)
1932                 reln = smgropen(buf->tag.rnode, InvalidBackendId);
1933
1934         TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
1935                                                                                 buf->tag.blockNum,
1936                                                                                 reln->smgr_rnode.node.spcNode,
1937                                                                                 reln->smgr_rnode.node.dbNode,
1938                                                                                 reln->smgr_rnode.node.relNode);
1939
1940         LockBufHdr(buf);
1941
1942         /*
1943          * Run PageGetLSN while holding header lock, since we don't have the
1944          * buffer locked exclusively in all cases.
1945          */
1946         recptr = BufferGetLSN(buf);
1947
1948         /* To check if block content changes while flushing. - vadim 01/17/97 */
1949         buf->flags &= ~BM_JUST_DIRTIED;
1950         UnlockBufHdr(buf);
1951
1952         /*
1953          * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
1954          * rule that log updates must hit disk before any of the data-file changes
1955          * they describe do.
1956          *
1957          * However, this rule does not apply to unlogged relations, which will be
1958          * lost after a crash anyway.  Most unlogged relation pages do not bear
1959          * LSNs since we never emit WAL records for them, and therefore flushing
1960          * up through the buffer LSN would be useless, but harmless.  However,
1961          * GiST indexes use LSNs internally to track page-splits, and therefore
1962          * unlogged GiST pages bear "fake" LSNs generated by
1963          * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
1964          * LSN counter could advance past the WAL insertion point; and if it did
1965          * happen, attempting to flush WAL through that location would fail, with
1966          * disastrous system-wide consequences.  To make sure that can't happen,
1967          * skip the flush if the buffer isn't permanent.
1968          */
1969         if (buf->flags & BM_PERMANENT)
1970                 XLogFlush(recptr);
1971
1972         /*
1973          * Now it's safe to write buffer to disk. Note that no one else should
1974          * have been able to write it while we were busy with log flushing because
1975          * we have the io_in_progress lock.
1976          */
1977         bufBlock = BufHdrGetBlock(buf);
1978
1979         /*
1980          * Update page checksum if desired.  Since we have only shared lock on the
1981          * buffer, other processes might be updating hint bits in it, so we must
1982          * copy the page to private storage if we do checksumming.
1983          */
1984         bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
1985
1986         if (track_io_timing)
1987                 INSTR_TIME_SET_CURRENT(io_start);
1988
1989         /*
1990          * bufToWrite is either the shared buffer or a copy, as appropriate.
1991          */
1992         smgrwrite(reln,
1993                           buf->tag.forkNum,
1994                           buf->tag.blockNum,
1995                           bufToWrite,
1996                           false);
1997
1998         if (track_io_timing)
1999         {
2000                 INSTR_TIME_SET_CURRENT(io_time);
2001                 INSTR_TIME_SUBTRACT(io_time, io_start);
2002                 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2003                 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2004         }
2005
2006         pgBufferUsage.shared_blks_written++;
2007
2008         /*
2009          * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2010          * end the io_in_progress state.
2011          */
2012         TerminateBufferIO(buf, true, 0);
2013
2014         TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2015                                                                            buf->tag.blockNum,
2016                                                                            reln->smgr_rnode.node.spcNode,
2017                                                                            reln->smgr_rnode.node.dbNode,
2018                                                                            reln->smgr_rnode.node.relNode);
2019
2020         /* Pop the error context stack */
2021         error_context_stack = errcallback.previous;
2022 }
2023
2024 /*
2025  * RelationGetNumberOfBlocks
2026  *              Determines the current number of pages in the relation.
2027  */
2028 BlockNumber
2029 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2030 {
2031         /* Open it at the smgr level if not already done */
2032         RelationOpenSmgr(relation);
2033
2034         return smgrnblocks(relation->rd_smgr, forkNum);
2035 }
2036
2037 /*
2038  * BufferIsPermanent
2039  *              Determines whether a buffer will potentially still be around after
2040  *              a crash.  Caller must hold a buffer pin.
2041  */
2042 bool
2043 BufferIsPermanent(Buffer buffer)
2044 {
2045         volatile BufferDesc *bufHdr;
2046
2047         /* Local buffers are used only for temp relations. */
2048         if (BufferIsLocal(buffer))
2049                 return false;
2050
2051         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2052         Assert(BufferIsValid(buffer));
2053         Assert(BufferIsPinned(buffer));
2054
2055         /*
2056          * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2057          * need not bother with the buffer header spinlock.  Even if someone else
2058          * changes the buffer header flags while we're doing this, we assume that
2059          * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
2060          * old value or the new value, but not random garbage.
2061          */
2062         bufHdr = &BufferDescriptors[buffer - 1];
2063         return (bufHdr->flags & BM_PERMANENT) != 0;
2064 }
2065
2066 /*
2067  * BufferGetLSNAtomic
2068  *              Retrieves the LSN of the buffer atomically using a buffer header lock.
2069  *              This is necessary for some callers who may not have an exclusive lock
2070  *              on the buffer.
2071  */
2072 XLogRecPtr
2073 BufferGetLSNAtomic(Buffer buffer)
2074 {
2075         volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
2076         char       *page = BufferGetPage(buffer);
2077         XLogRecPtr      lsn;
2078
2079         /*
2080          * If we don't need locking for correctness, fastpath out.
2081          */
2082         if (!DataChecksumsEnabled() || BufferIsLocal(buffer))
2083                 return PageGetLSN(page);
2084
2085         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2086         Assert(BufferIsValid(buffer));
2087         Assert(BufferIsPinned(buffer));
2088
2089         LockBufHdr(bufHdr);
2090         lsn = PageGetLSN(page);
2091         UnlockBufHdr(bufHdr);
2092
2093         return lsn;
2094 }
2095
2096 /* ---------------------------------------------------------------------
2097  *              DropRelFileNodeBuffers
2098  *
2099  *              This function removes from the buffer pool all the pages of the
2100  *              specified relation fork that have block numbers >= firstDelBlock.
2101  *              (In particular, with firstDelBlock = 0, all pages are removed.)
2102  *              Dirty pages are simply dropped, without bothering to write them
2103  *              out first.  Therefore, this is NOT rollback-able, and so should be
2104  *              used only with extreme caution!
2105  *
2106  *              Currently, this is called only from smgr.c when the underlying file
2107  *              is about to be deleted or truncated (firstDelBlock is needed for
2108  *              the truncation case).  The data in the affected pages would therefore
2109  *              be deleted momentarily anyway, and there is no point in writing it.
2110  *              It is the responsibility of higher-level code to ensure that the
2111  *              deletion or truncation does not lose any data that could be needed
2112  *              later.  It is also the responsibility of higher-level code to ensure
2113  *              that no other process could be trying to load more pages of the
2114  *              relation into buffers.
2115  *
2116  *              XXX currently it sequentially searches the buffer pool, should be
2117  *              changed to more clever ways of searching.  However, this routine
2118  *              is used only in code paths that aren't very performance-critical,
2119  *              and we shouldn't slow down the hot paths to make it faster ...
2120  * --------------------------------------------------------------------
2121  */
2122 void
2123 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2124                                            BlockNumber firstDelBlock)
2125 {
2126         int                     i;
2127
2128         /* If it's a local relation, it's localbuf.c's problem. */
2129         if (RelFileNodeBackendIsTemp(rnode))
2130         {
2131                 if (rnode.backend == MyBackendId)
2132                         DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2133                 return;
2134         }
2135
2136         for (i = 0; i < NBuffers; i++)
2137         {
2138                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2139
2140                 /*
2141                  * We can make this a tad faster by prechecking the buffer tag before
2142                  * we attempt to lock the buffer; this saves a lot of lock
2143                  * acquisitions in typical cases.  It should be safe because the
2144                  * caller must have AccessExclusiveLock on the relation, or some other
2145                  * reason to be certain that no one is loading new pages of the rel
2146                  * into the buffer pool.  (Otherwise we might well miss such pages
2147                  * entirely.)  Therefore, while the tag might be changing while we
2148                  * look at it, it can't be changing *to* a value we care about, only
2149                  * *away* from such a value.  So false negatives are impossible, and
2150                  * false positives are safe because we'll recheck after getting the
2151                  * buffer lock.
2152                  *
2153                  * We could check forkNum and blockNum as well as the rnode, but the
2154                  * incremental win from doing so seems small.
2155                  */
2156                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2157                         continue;
2158
2159                 LockBufHdr(bufHdr);
2160                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2161                         bufHdr->tag.forkNum == forkNum &&
2162                         bufHdr->tag.blockNum >= firstDelBlock)
2163                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2164                 else
2165                         UnlockBufHdr(bufHdr);
2166         }
2167 }
2168
2169 /* ---------------------------------------------------------------------
2170  *              DropRelFileNodesAllBuffers
2171  *
2172  *              This function removes from the buffer pool all the pages of all
2173  *              forks of the specified relations.  It's equivalent to calling
2174  *              DropRelFileNodeBuffers once per fork per relation with
2175  *              firstDelBlock = 0.
2176  * --------------------------------------------------------------------
2177  */
2178 void
2179 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2180 {
2181         int                     i,
2182                                 n = 0;
2183         RelFileNode *nodes;
2184         bool            use_bsearch;
2185
2186         if (nnodes == 0)
2187                 return;
2188
2189         nodes = palloc(sizeof(RelFileNode) * nnodes);           /* non-local relations */
2190
2191         /* If it's a local relation, it's localbuf.c's problem. */
2192         for (i = 0; i < nnodes; i++)
2193         {
2194                 if (RelFileNodeBackendIsTemp(rnodes[i]))
2195                 {
2196                         if (rnodes[i].backend == MyBackendId)
2197                                 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2198                 }
2199                 else
2200                         nodes[n++] = rnodes[i].node;
2201         }
2202
2203         /*
2204          * If there are no non-local relations, then we're done. Release the
2205          * memory and return.
2206          */
2207         if (n == 0)
2208         {
2209                 pfree(nodes);
2210                 return;
2211         }
2212
2213         /*
2214          * For low number of relations to drop just use a simple walk through, to
2215          * save the bsearch overhead. The threshold to use is rather a guess than
2216          * an exactly determined value, as it depends on many factors (CPU and RAM
2217          * speeds, amount of shared buffers etc.).
2218          */
2219         use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2220
2221         /* sort the list of rnodes if necessary */
2222         if (use_bsearch)
2223                 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2224
2225         for (i = 0; i < NBuffers; i++)
2226         {
2227                 RelFileNode *rnode = NULL;
2228                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2229
2230                 /*
2231                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2232                  * and saves some cycles.
2233                  */
2234
2235                 if (!use_bsearch)
2236                 {
2237                         int                     j;
2238
2239                         for (j = 0; j < n; j++)
2240                         {
2241                                 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
2242                                 {
2243                                         rnode = &nodes[j];
2244                                         break;
2245                                 }
2246                         }
2247                 }
2248                 else
2249                 {
2250                         rnode = bsearch((const void *) &(bufHdr->tag.rnode),
2251                                                         nodes, n, sizeof(RelFileNode),
2252                                                         rnode_comparator);
2253                 }
2254
2255                 /* buffer doesn't belong to any of the given relfilenodes; skip it */
2256                 if (rnode == NULL)
2257                         continue;
2258
2259                 LockBufHdr(bufHdr);
2260                 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
2261                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2262                 else
2263                         UnlockBufHdr(bufHdr);
2264         }
2265
2266         pfree(nodes);
2267 }
2268
2269 /* ---------------------------------------------------------------------
2270  *              DropDatabaseBuffers
2271  *
2272  *              This function removes all the buffers in the buffer cache for a
2273  *              particular database.  Dirty pages are simply dropped, without
2274  *              bothering to write them out first.  This is used when we destroy a
2275  *              database, to avoid trying to flush data to disk when the directory
2276  *              tree no longer exists.  Implementation is pretty similar to
2277  *              DropRelFileNodeBuffers() which is for destroying just one relation.
2278  * --------------------------------------------------------------------
2279  */
2280 void
2281 DropDatabaseBuffers(Oid dbid)
2282 {
2283         int                     i;
2284
2285         /*
2286          * We needn't consider local buffers, since by assumption the target
2287          * database isn't our own.
2288          */
2289
2290         for (i = 0; i < NBuffers; i++)
2291         {
2292                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2293
2294                 /*
2295                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2296                  * and saves some cycles.
2297                  */
2298                 if (bufHdr->tag.rnode.dbNode != dbid)
2299                         continue;
2300
2301                 LockBufHdr(bufHdr);
2302                 if (bufHdr->tag.rnode.dbNode == dbid)
2303                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2304                 else
2305                         UnlockBufHdr(bufHdr);
2306         }
2307 }
2308
2309 /* -----------------------------------------------------------------
2310  *              PrintBufferDescs
2311  *
2312  *              this function prints all the buffer descriptors, for debugging
2313  *              use only.
2314  * -----------------------------------------------------------------
2315  */
2316 #ifdef NOT_USED
2317 void
2318 PrintBufferDescs(void)
2319 {
2320         int                     i;
2321         volatile BufferDesc *buf = BufferDescriptors;
2322
2323         for (i = 0; i < NBuffers; ++i, ++buf)
2324         {
2325                 /* theoretically we should lock the bufhdr here */
2326                 elog(LOG,
2327                          "[%02d] (freeNext=%d, rel=%s, "
2328                          "blockNum=%u, flags=0x%x, refcount=%u %d)",
2329                          i, buf->freeNext,
2330                   relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
2331                          buf->tag.blockNum, buf->flags,
2332                          buf->refcount, PrivateRefCount[i]);
2333         }
2334 }
2335 #endif
2336
2337 #ifdef NOT_USED
2338 void
2339 PrintPinnedBufs(void)
2340 {
2341         int                     i;
2342         volatile BufferDesc *buf = BufferDescriptors;
2343
2344         for (i = 0; i < NBuffers; ++i, ++buf)
2345         {
2346                 if (PrivateRefCount[i] > 0)
2347                 {
2348                         /* theoretically we should lock the bufhdr here */
2349                         elog(LOG,
2350                                  "[%02d] (freeNext=%d, rel=%s, "
2351                                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
2352                                  i, buf->freeNext,
2353                                  relpath(buf->tag.rnode, buf->tag.forkNum),
2354                                  buf->tag.blockNum, buf->flags,
2355                                  buf->refcount, PrivateRefCount[i]);
2356                 }
2357         }
2358 }
2359 #endif
2360
2361 /* ---------------------------------------------------------------------
2362  *              FlushRelationBuffers
2363  *
2364  *              This function writes all dirty pages of a relation out to disk
2365  *              (or more accurately, out to kernel disk buffers), ensuring that the
2366  *              kernel has an up-to-date view of the relation.
2367  *
2368  *              Generally, the caller should be holding AccessExclusiveLock on the
2369  *              target relation to ensure that no other backend is busy dirtying
2370  *              more blocks of the relation; the effects can't be expected to last
2371  *              after the lock is released.
2372  *
2373  *              XXX currently it sequentially searches the buffer pool, should be
2374  *              changed to more clever ways of searching.  This routine is not
2375  *              used in any performance-critical code paths, so it's not worth
2376  *              adding additional overhead to normal paths to make it go faster;
2377  *              but see also DropRelFileNodeBuffers.
2378  * --------------------------------------------------------------------
2379  */
2380 void
2381 FlushRelationBuffers(Relation rel)
2382 {
2383         int                     i;
2384         volatile BufferDesc *bufHdr;
2385
2386         /* Open rel at the smgr level if not already done */
2387         RelationOpenSmgr(rel);
2388
2389         if (RelationUsesLocalBuffers(rel))
2390         {
2391                 for (i = 0; i < NLocBuffer; i++)
2392                 {
2393                         bufHdr = &LocalBufferDescriptors[i];
2394                         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2395                                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2396                         {
2397                                 ErrorContextCallback errcallback;
2398                                 Page            localpage;
2399
2400                                 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
2401
2402                                 /* Setup error traceback support for ereport() */
2403                                 errcallback.callback = local_buffer_write_error_callback;
2404                                 errcallback.arg = (void *) bufHdr;
2405                                 errcallback.previous = error_context_stack;
2406                                 error_context_stack = &errcallback;
2407
2408                                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
2409
2410                                 smgrwrite(rel->rd_smgr,
2411                                                   bufHdr->tag.forkNum,
2412                                                   bufHdr->tag.blockNum,
2413                                                   localpage,
2414                                                   false);
2415
2416                                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
2417
2418                                 /* Pop the error context stack */
2419                                 error_context_stack = errcallback.previous;
2420                         }
2421                 }
2422
2423                 return;
2424         }
2425
2426         /* Make sure we can handle the pin inside the loop */
2427         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2428
2429         for (i = 0; i < NBuffers; i++)
2430         {
2431                 bufHdr = &BufferDescriptors[i];
2432
2433                 /*
2434                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2435                  * and saves some cycles.
2436                  */
2437                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
2438                         continue;
2439
2440                 LockBufHdr(bufHdr);
2441                 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2442                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2443                 {
2444                         PinBuffer_Locked(bufHdr);
2445                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2446                         FlushBuffer(bufHdr, rel->rd_smgr);
2447                         LWLockRelease(bufHdr->content_lock);
2448                         UnpinBuffer(bufHdr, true);
2449                 }
2450                 else
2451                         UnlockBufHdr(bufHdr);
2452         }
2453 }
2454
2455 /* ---------------------------------------------------------------------
2456  *              FlushDatabaseBuffers
2457  *
2458  *              This function writes all dirty pages of a database out to disk
2459  *              (or more accurately, out to kernel disk buffers), ensuring that the
2460  *              kernel has an up-to-date view of the database.
2461  *
2462  *              Generally, the caller should be holding an appropriate lock to ensure
2463  *              no other backend is active in the target database; otherwise more
2464  *              pages could get dirtied.
2465  *
2466  *              Note we don't worry about flushing any pages of temporary relations.
2467  *              It's assumed these wouldn't be interesting.
2468  * --------------------------------------------------------------------
2469  */
2470 void
2471 FlushDatabaseBuffers(Oid dbid)
2472 {
2473         int                     i;
2474         volatile BufferDesc *bufHdr;
2475
2476         /* Make sure we can handle the pin inside the loop */
2477         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2478
2479         for (i = 0; i < NBuffers; i++)
2480         {
2481                 bufHdr = &BufferDescriptors[i];
2482
2483                 /*
2484                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2485                  * and saves some cycles.
2486                  */
2487                 if (bufHdr->tag.rnode.dbNode != dbid)
2488                         continue;
2489
2490                 LockBufHdr(bufHdr);
2491                 if (bufHdr->tag.rnode.dbNode == dbid &&
2492                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2493                 {
2494                         PinBuffer_Locked(bufHdr);
2495                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2496                         FlushBuffer(bufHdr, NULL);
2497                         LWLockRelease(bufHdr->content_lock);
2498                         UnpinBuffer(bufHdr, true);
2499                 }
2500                 else
2501                         UnlockBufHdr(bufHdr);
2502         }
2503 }
2504
2505 /*
2506  * ReleaseBuffer -- release the pin on a buffer
2507  */
2508 void
2509 ReleaseBuffer(Buffer buffer)
2510 {
2511         volatile BufferDesc *bufHdr;
2512
2513         if (!BufferIsValid(buffer))
2514                 elog(ERROR, "bad buffer ID: %d", buffer);
2515
2516         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
2517
2518         if (BufferIsLocal(buffer))
2519         {
2520                 Assert(LocalRefCount[-buffer - 1] > 0);
2521                 LocalRefCount[-buffer - 1]--;
2522                 return;
2523         }
2524
2525         bufHdr = &BufferDescriptors[buffer - 1];
2526
2527         Assert(PrivateRefCount[buffer - 1] > 0);
2528
2529         if (PrivateRefCount[buffer - 1] > 1)
2530                 PrivateRefCount[buffer - 1]--;
2531         else
2532                 UnpinBuffer(bufHdr, false);
2533 }
2534
2535 /*
2536  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
2537  *
2538  * This is just a shorthand for a common combination.
2539  */
2540 void
2541 UnlockReleaseBuffer(Buffer buffer)
2542 {
2543         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2544         ReleaseBuffer(buffer);
2545 }
2546
2547 /*
2548  * IncrBufferRefCount
2549  *              Increment the pin count on a buffer that we have *already* pinned
2550  *              at least once.
2551  *
2552  *              This function cannot be used on a buffer we do not have pinned,
2553  *              because it doesn't change the shared buffer state.
2554  */
2555 void
2556 IncrBufferRefCount(Buffer buffer)
2557 {
2558         Assert(BufferIsPinned(buffer));
2559         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2560         ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
2561         if (BufferIsLocal(buffer))
2562                 LocalRefCount[-buffer - 1]++;
2563         else
2564                 PrivateRefCount[buffer - 1]++;
2565 }
2566
2567 /*
2568  * MarkBufferDirtyHint
2569  *
2570  *      Mark a buffer dirty for non-critical changes.
2571  *
2572  * This is essentially the same as MarkBufferDirty, except:
2573  *
2574  * 1. The caller does not write WAL; so if checksums are enabled, we may need
2575  *        to write an XLOG_HINT WAL record to protect against torn pages.
2576  * 2. The caller might have only share-lock instead of exclusive-lock on the
2577  *        buffer's content lock.
2578  * 3. This function does not guarantee that the buffer is always marked dirty
2579  *        (due to a race condition), so it cannot be used for important changes.
2580  */
2581 void
2582 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
2583 {
2584         volatile BufferDesc *bufHdr;
2585         Page            page = BufferGetPage(buffer);
2586
2587         if (!BufferIsValid(buffer))
2588                 elog(ERROR, "bad buffer ID: %d", buffer);
2589
2590         if (BufferIsLocal(buffer))
2591         {
2592                 MarkLocalBufferDirty(buffer);
2593                 return;
2594         }
2595
2596         bufHdr = &BufferDescriptors[buffer - 1];
2597
2598         Assert(PrivateRefCount[buffer - 1] > 0);
2599         /* here, either share or exclusive lock is OK */
2600         Assert(LWLockHeldByMe(bufHdr->content_lock));
2601
2602         /*
2603          * This routine might get called many times on the same page, if we are
2604          * making the first scan after commit of an xact that added/deleted many
2605          * tuples. So, be as quick as we can if the buffer is already dirty.  We
2606          * do this by not acquiring spinlock if it looks like the status bits are
2607          * already set.  Since we make this test unlocked, there's a chance we
2608          * might fail to notice that the flags have just been cleared, and failed
2609          * to reset them, due to memory-ordering issues.  But since this function
2610          * is only intended to be used in cases where failing to write out the
2611          * data would be harmless anyway, it doesn't really matter.
2612          */
2613         if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
2614                 (BM_DIRTY | BM_JUST_DIRTIED))
2615         {
2616                 XLogRecPtr      lsn = InvalidXLogRecPtr;
2617                 bool            dirtied = false;
2618                 bool            delayChkpt = false;
2619
2620                 /*
2621                  * If we need to protect hint bit updates from torn writes, WAL-log a
2622                  * full page image of the page. This full page image is only necessary
2623                  * if the hint bit update is the first change to the page since the
2624                  * last checkpoint.
2625                  *
2626                  * We don't check full_page_writes here because that logic is included
2627                  * when we call XLogInsert() since the value changes dynamically.
2628                  */
2629                 if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
2630                 {
2631                         /*
2632                          * If we're in recovery we cannot dirty a page because of a hint.
2633                          * We can set the hint, just not dirty the page as a result so the
2634                          * hint is lost when we evict the page or shutdown.
2635                          *
2636                          * See src/backend/storage/page/README for longer discussion.
2637                          */
2638                         if (RecoveryInProgress())
2639                                 return;
2640
2641                         /*
2642                          * If the block is already dirty because we either made a change
2643                          * or set a hint already, then we don't need to write a full page
2644                          * image.  Note that aggressive cleaning of blocks dirtied by hint
2645                          * bit setting would increase the call rate. Bulk setting of hint
2646                          * bits would reduce the call rate...
2647                          *
2648                          * We must issue the WAL record before we mark the buffer dirty.
2649                          * Otherwise we might write the page before we write the WAL. That
2650                          * causes a race condition, since a checkpoint might occur between
2651                          * writing the WAL record and marking the buffer dirty. We solve
2652                          * that with a kluge, but one that is already in use during
2653                          * transaction commit to prevent race conditions. Basically, we
2654                          * simply prevent the checkpoint WAL record from being written
2655                          * until we have marked the buffer dirty. We don't start the
2656                          * checkpoint flush until we have marked dirty, so our checkpoint
2657                          * must flush the change to disk successfully or the checkpoint
2658                          * never gets written, so crash recovery will fix.
2659                          *
2660                          * It's possible we may enter here without an xid, so it is
2661                          * essential that CreateCheckpoint waits for virtual transactions
2662                          * rather than full transactionids.
2663                          */
2664                         MyPgXact->delayChkpt = delayChkpt = true;
2665                         lsn = XLogSaveBufferForHint(buffer, buffer_std);
2666                 }
2667
2668                 LockBufHdr(bufHdr);
2669                 Assert(bufHdr->refcount > 0);
2670                 if (!(bufHdr->flags & BM_DIRTY))
2671                 {
2672                         dirtied = true;         /* Means "will be dirtied by this action" */
2673
2674                         /*
2675                          * Set the page LSN if we wrote a backup block. We aren't supposed
2676                          * to set this when only holding a share lock but as long as we
2677                          * serialise it somehow we're OK. We choose to set LSN while
2678                          * holding the buffer header lock, which causes any reader of an
2679                          * LSN who holds only a share lock to also obtain a buffer header
2680                          * lock before using PageGetLSN(), which is enforced in
2681                          * BufferGetLSNAtomic().
2682                          *
2683                          * If checksums are enabled, you might think we should reset the
2684                          * checksum here. That will happen when the page is written
2685                          * sometime later in this checkpoint cycle.
2686                          */
2687                         if (!XLogRecPtrIsInvalid(lsn))
2688                                 PageSetLSN(page, lsn);
2689                 }
2690                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
2691                 UnlockBufHdr(bufHdr);
2692
2693                 if (delayChkpt)
2694                         MyPgXact->delayChkpt = false;
2695
2696                 if (dirtied)
2697                 {
2698                         VacuumPageDirty++;
2699                         pgBufferUsage.shared_blks_dirtied++;
2700                         if (VacuumCostActive)
2701                                 VacuumCostBalance += VacuumCostPageDirty;
2702                 }
2703         }
2704 }
2705
2706 /*
2707  * Release buffer content locks for shared buffers.
2708  *
2709  * Used to clean up after errors.
2710  *
2711  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
2712  * of releasing buffer content locks per se; the only thing we need to deal
2713  * with here is clearing any PIN_COUNT request that was in progress.
2714  */
2715 void
2716 UnlockBuffers(void)
2717 {
2718         volatile BufferDesc *buf = PinCountWaitBuf;
2719
2720         if (buf)
2721         {
2722                 LockBufHdr(buf);
2723
2724                 /*
2725                  * Don't complain if flag bit not set; it could have been reset but we
2726                  * got a cancel/die interrupt before getting the signal.
2727                  */
2728                 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
2729                         buf->wait_backend_pid == MyProcPid)
2730                         buf->flags &= ~BM_PIN_COUNT_WAITER;
2731
2732                 UnlockBufHdr(buf);
2733
2734                 PinCountWaitBuf = NULL;
2735         }
2736 }
2737
2738 /*
2739  * Acquire or release the content_lock for the buffer.
2740  */
2741 void
2742 LockBuffer(Buffer buffer, int mode)
2743 {
2744         volatile BufferDesc *buf;
2745
2746         Assert(BufferIsValid(buffer));
2747         if (BufferIsLocal(buffer))
2748                 return;                                 /* local buffers need no lock */
2749
2750         buf = &(BufferDescriptors[buffer - 1]);
2751
2752         if (mode == BUFFER_LOCK_UNLOCK)
2753                 LWLockRelease(buf->content_lock);
2754         else if (mode == BUFFER_LOCK_SHARE)
2755                 LWLockAcquire(buf->content_lock, LW_SHARED);
2756         else if (mode == BUFFER_LOCK_EXCLUSIVE)
2757                 LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
2758         else
2759                 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
2760 }
2761
2762 /*
2763  * Acquire the content_lock for the buffer, but only if we don't have to wait.
2764  *
2765  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
2766  */
2767 bool
2768 ConditionalLockBuffer(Buffer buffer)
2769 {
2770         volatile BufferDesc *buf;
2771
2772         Assert(BufferIsValid(buffer));
2773         if (BufferIsLocal(buffer))
2774                 return true;                    /* act as though we got it */
2775
2776         buf = &(BufferDescriptors[buffer - 1]);
2777
2778         return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
2779 }
2780
2781 /*
2782  * LockBufferForCleanup - lock a buffer in preparation for deleting items
2783  *
2784  * Items may be deleted from a disk page only when the caller (a) holds an
2785  * exclusive lock on the buffer and (b) has observed that no other backend
2786  * holds a pin on the buffer.  If there is a pin, then the other backend
2787  * might have a pointer into the buffer (for example, a heapscan reference
2788  * to an item --- see README for more details).  It's OK if a pin is added
2789  * after the cleanup starts, however; the newly-arrived backend will be
2790  * unable to look at the page until we release the exclusive lock.
2791  *
2792  * To implement this protocol, a would-be deleter must pin the buffer and
2793  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
2794  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
2795  * it has successfully observed pin count = 1.
2796  */
2797 void
2798 LockBufferForCleanup(Buffer buffer)
2799 {
2800         volatile BufferDesc *bufHdr;
2801
2802         Assert(BufferIsValid(buffer));
2803         Assert(PinCountWaitBuf == NULL);
2804
2805         if (BufferIsLocal(buffer))
2806         {
2807                 /* There should be exactly one pin */
2808                 if (LocalRefCount[-buffer - 1] != 1)
2809                         elog(ERROR, "incorrect local pin count: %d",
2810                                  LocalRefCount[-buffer - 1]);
2811                 /* Nobody else to wait for */
2812                 return;
2813         }
2814
2815         /* There should be exactly one local pin */
2816         if (PrivateRefCount[buffer - 1] != 1)
2817                 elog(ERROR, "incorrect local pin count: %d",
2818                          PrivateRefCount[buffer - 1]);
2819
2820         bufHdr = &BufferDescriptors[buffer - 1];
2821
2822         for (;;)
2823         {
2824                 /* Try to acquire lock */
2825                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
2826                 LockBufHdr(bufHdr);
2827                 Assert(bufHdr->refcount > 0);
2828                 if (bufHdr->refcount == 1)
2829                 {
2830                         /* Successfully acquired exclusive lock with pincount 1 */
2831                         UnlockBufHdr(bufHdr);
2832                         return;
2833                 }
2834                 /* Failed, so mark myself as waiting for pincount 1 */
2835                 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
2836                 {
2837                         UnlockBufHdr(bufHdr);
2838                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2839                         elog(ERROR, "multiple backends attempting to wait for pincount 1");
2840                 }
2841                 bufHdr->wait_backend_pid = MyProcPid;
2842                 bufHdr->flags |= BM_PIN_COUNT_WAITER;
2843                 PinCountWaitBuf = bufHdr;
2844                 UnlockBufHdr(bufHdr);
2845                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2846
2847                 /* Wait to be signaled by UnpinBuffer() */
2848                 if (InHotStandby)
2849                 {
2850                         /* Publish the bufid that Startup process waits on */
2851                         SetStartupBufferPinWaitBufId(buffer - 1);
2852                         /* Set alarm and then wait to be signaled by UnpinBuffer() */
2853                         ResolveRecoveryConflictWithBufferPin();
2854                         /* Reset the published bufid */
2855                         SetStartupBufferPinWaitBufId(-1);
2856                 }
2857                 else
2858                         ProcWaitForSignal();
2859
2860                 PinCountWaitBuf = NULL;
2861                 /* Loop back and try again */
2862         }
2863 }
2864
2865 /*
2866  * Check called from RecoveryConflictInterrupt handler when Startup
2867  * process requests cancellation of all pin holders that are blocking it.
2868  */
2869 bool
2870 HoldingBufferPinThatDelaysRecovery(void)
2871 {
2872         int                     bufid = GetStartupBufferPinWaitBufId();
2873
2874         /*
2875          * If we get woken slowly then it's possible that the Startup process was
2876          * already woken by other backends before we got here. Also possible that
2877          * we get here by multiple interrupts or interrupts at inappropriate
2878          * times, so make sure we do nothing if the bufid is not set.
2879          */
2880         if (bufid < 0)
2881                 return false;
2882
2883         if (PrivateRefCount[bufid] > 0)
2884                 return true;
2885
2886         return false;
2887 }
2888
2889 /*
2890  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
2891  *
2892  * We won't loop, but just check once to see if the pin count is OK.  If
2893  * not, return FALSE with no lock held.
2894  */
2895 bool
2896 ConditionalLockBufferForCleanup(Buffer buffer)
2897 {
2898         volatile BufferDesc *bufHdr;
2899
2900         Assert(BufferIsValid(buffer));
2901
2902         if (BufferIsLocal(buffer))
2903         {
2904                 /* There should be exactly one pin */
2905                 Assert(LocalRefCount[-buffer - 1] > 0);
2906                 if (LocalRefCount[-buffer - 1] != 1)
2907                         return false;
2908                 /* Nobody else to wait for */
2909                 return true;
2910         }
2911
2912         /* There should be exactly one local pin */
2913         Assert(PrivateRefCount[buffer - 1] > 0);
2914         if (PrivateRefCount[buffer - 1] != 1)
2915                 return false;
2916
2917         /* Try to acquire lock */
2918         if (!ConditionalLockBuffer(buffer))
2919                 return false;
2920
2921         bufHdr = &BufferDescriptors[buffer - 1];
2922         LockBufHdr(bufHdr);
2923         Assert(bufHdr->refcount > 0);
2924         if (bufHdr->refcount == 1)
2925         {
2926                 /* Successfully acquired exclusive lock with pincount 1 */
2927                 UnlockBufHdr(bufHdr);
2928                 return true;
2929         }
2930
2931         /* Failed, so release the lock */
2932         UnlockBufHdr(bufHdr);
2933         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2934         return false;
2935 }
2936
2937
2938 /*
2939  *      Functions for buffer I/O handling
2940  *
2941  *      Note: We assume that nested buffer I/O never occurs.
2942  *      i.e at most one io_in_progress lock is held per proc.
2943  *
2944  *      Also note that these are used only for shared buffers, not local ones.
2945  */
2946
2947 /*
2948  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
2949  */
2950 static void
2951 WaitIO(volatile BufferDesc *buf)
2952 {
2953         /*
2954          * Changed to wait until there's no IO - Inoue 01/13/2000
2955          *
2956          * Note this is *necessary* because an error abort in the process doing
2957          * I/O could release the io_in_progress_lock prematurely. See
2958          * AbortBufferIO.
2959          */
2960         for (;;)
2961         {
2962                 BufFlags        sv_flags;
2963
2964                 /*
2965                  * It may not be necessary to acquire the spinlock to check the flag
2966                  * here, but since this test is essential for correctness, we'd better
2967                  * play it safe.
2968                  */
2969                 LockBufHdr(buf);
2970                 sv_flags = buf->flags;
2971                 UnlockBufHdr(buf);
2972                 if (!(sv_flags & BM_IO_IN_PROGRESS))
2973                         break;
2974                 LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
2975                 LWLockRelease(buf->io_in_progress_lock);
2976         }
2977 }
2978
2979 /*
2980  * StartBufferIO: begin I/O on this buffer
2981  *      (Assumptions)
2982  *      My process is executing no IO
2983  *      The buffer is Pinned
2984  *
2985  * In some scenarios there are race conditions in which multiple backends
2986  * could attempt the same I/O operation concurrently.  If someone else
2987  * has already started I/O on this buffer then we will block on the
2988  * io_in_progress lock until he's done.
2989  *
2990  * Input operations are only attempted on buffers that are not BM_VALID,
2991  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
2992  * so we can always tell if the work is already done.
2993  *
2994  * Returns TRUE if we successfully marked the buffer as I/O busy,
2995  * FALSE if someone else already did the work.
2996  */
2997 static bool
2998 StartBufferIO(volatile BufferDesc *buf, bool forInput)
2999 {
3000         Assert(!InProgressBuf);
3001
3002         for (;;)
3003         {
3004                 /*
3005                  * Grab the io_in_progress lock so that other processes can wait for
3006                  * me to finish the I/O.
3007                  */
3008                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3009
3010                 LockBufHdr(buf);
3011
3012                 if (!(buf->flags & BM_IO_IN_PROGRESS))
3013                         break;
3014
3015                 /*
3016                  * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3017                  * lock isn't held is if the process doing the I/O is recovering from
3018                  * an error (see AbortBufferIO).  If that's the case, we must wait for
3019                  * him to get unwedged.
3020                  */
3021                 UnlockBufHdr(buf);
3022                 LWLockRelease(buf->io_in_progress_lock);
3023                 WaitIO(buf);
3024         }
3025
3026         /* Once we get here, there is definitely no I/O active on this buffer */
3027
3028         if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
3029         {
3030                 /* someone else already did the I/O */
3031                 UnlockBufHdr(buf);
3032                 LWLockRelease(buf->io_in_progress_lock);
3033                 return false;
3034         }
3035
3036         buf->flags |= BM_IO_IN_PROGRESS;
3037
3038         UnlockBufHdr(buf);
3039
3040         InProgressBuf = buf;
3041         IsForInput = forInput;
3042
3043         return true;
3044 }
3045
3046 /*
3047  * TerminateBufferIO: release a buffer we were doing I/O on
3048  *      (Assumptions)
3049  *      My process is executing IO for the buffer
3050  *      BM_IO_IN_PROGRESS bit is set for the buffer
3051  *      We hold the buffer's io_in_progress lock
3052  *      The buffer is Pinned
3053  *
3054  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3055  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
3056  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
3057  * marking the buffer clean if it was re-dirtied while we were writing.
3058  *
3059  * set_flag_bits gets ORed into the buffer's flags.  It must include
3060  * BM_IO_ERROR in a failure case.  For successful completion it could
3061  * be 0, or BM_VALID if we just finished reading in the page.
3062  */
3063 static void
3064 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
3065                                   int set_flag_bits)
3066 {
3067         Assert(buf == InProgressBuf);
3068
3069         LockBufHdr(buf);
3070
3071         Assert(buf->flags & BM_IO_IN_PROGRESS);
3072         buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3073         if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
3074                 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3075         buf->flags |= set_flag_bits;
3076
3077         UnlockBufHdr(buf);
3078
3079         InProgressBuf = NULL;
3080
3081         LWLockRelease(buf->io_in_progress_lock);
3082 }
3083
3084 /*
3085  * AbortBufferIO: Clean up any active buffer I/O after an error.
3086  *
3087  *      All LWLocks we might have held have been released,
3088  *      but we haven't yet released buffer pins, so the buffer is still pinned.
3089  *
3090  *      If I/O was in progress, we always set BM_IO_ERROR, even though it's
3091  *      possible the error condition wasn't related to the I/O.
3092  */
3093 void
3094 AbortBufferIO(void)
3095 {
3096         volatile BufferDesc *buf = InProgressBuf;
3097
3098         if (buf)
3099         {
3100                 /*
3101                  * Since LWLockReleaseAll has already been called, we're not holding
3102                  * the buffer's io_in_progress_lock. We have to re-acquire it so that
3103                  * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3104                  * buffer will be in a busy spin until we succeed in doing this.
3105                  */
3106                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3107
3108                 LockBufHdr(buf);
3109                 Assert(buf->flags & BM_IO_IN_PROGRESS);
3110                 if (IsForInput)
3111                 {
3112                         Assert(!(buf->flags & BM_DIRTY));
3113                         /* We'd better not think buffer is valid yet */
3114                         Assert(!(buf->flags & BM_VALID));
3115                         UnlockBufHdr(buf);
3116                 }
3117                 else
3118                 {
3119                         BufFlags        sv_flags;
3120
3121                         sv_flags = buf->flags;
3122                         Assert(sv_flags & BM_DIRTY);
3123                         UnlockBufHdr(buf);
3124                         /* Issue notice if this is not the first failure... */
3125                         if (sv_flags & BM_IO_ERROR)
3126                         {
3127                                 /* Buffer is pinned, so we can read tag without spinlock */
3128                                 char       *path;
3129
3130                                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
3131                                 ereport(WARNING,
3132                                                 (errcode(ERRCODE_IO_ERROR),
3133                                                  errmsg("could not write block %u of %s",
3134                                                                 buf->tag.blockNum, path),
3135                                                  errdetail("Multiple failures --- write error might be permanent.")));
3136                                 pfree(path);
3137                         }
3138                 }
3139                 TerminateBufferIO(buf, false, BM_IO_ERROR);
3140         }
3141 }
3142
3143 /*
3144  * Error context callback for errors occurring during shared buffer writes.
3145  */
3146 static void
3147 shared_buffer_write_error_callback(void *arg)
3148 {
3149         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3150
3151         /* Buffer is pinned, so we can read the tag without locking the spinlock */
3152         if (bufHdr != NULL)
3153         {
3154                 char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
3155
3156                 errcontext("writing block %u of relation %s",
3157                                    bufHdr->tag.blockNum, path);
3158                 pfree(path);
3159         }
3160 }
3161
3162 /*
3163  * Error context callback for errors occurring during local buffer writes.
3164  */
3165 static void
3166 local_buffer_write_error_callback(void *arg)
3167 {
3168         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3169
3170         if (bufHdr != NULL)
3171         {
3172                 char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
3173                                                                                   bufHdr->tag.forkNum);
3174
3175                 errcontext("writing block %u of relation %s",
3176                                    bufHdr->tag.blockNum, path);
3177                 pfree(path);
3178         }
3179 }
3180
3181 /*
3182  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
3183  */
3184 static int
3185 rnode_comparator(const void *p1, const void *p2)
3186 {
3187         RelFileNode n1 = *(RelFileNode *) p1;
3188         RelFileNode n2 = *(RelFileNode *) p2;
3189
3190         if (n1.relNode < n2.relNode)
3191                 return -1;
3192         else if (n1.relNode > n2.relNode)
3193                 return 1;
3194
3195         if (n1.dbNode < n2.dbNode)
3196                 return -1;
3197         else if (n1.dbNode > n2.dbNode)
3198                 return 1;
3199
3200         if (n1.spcNode < n2.spcNode)
3201                 return -1;
3202         else if (n1.spcNode > n2.spcNode)
3203                 return 1;
3204         else
3205                 return 0;
3206 }