]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
Update copyright for 2014
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *              and pin it so that no one can destroy it while this process
20  *              is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *              The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *              freelist.c -- chooses victim for buffer replacement
29  *              buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "catalog/catalog.h"
37 #include "catalog/storage.h"
38 #include "common/relpath.h"
39 #include "executor/instrument.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/buf_internals.h"
45 #include "storage/bufmgr.h"
46 #include "storage/ipc.h"
47 #include "storage/proc.h"
48 #include "storage/smgr.h"
49 #include "storage/standby.h"
50 #include "utils/rel.h"
51 #include "utils/resowner_private.h"
52 #include "utils/timestamp.h"
53
54
55 /* Note: these two macros only work on shared buffers, not local ones! */
56 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
57 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
58
59 /* Note: this macro only works on local buffers, not shared ones! */
60 #define LocalBufHdrGetBlock(bufHdr) \
61         LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
62
63 /* Bits in SyncOneBuffer's return value */
64 #define BUF_WRITTEN                             0x01
65 #define BUF_REUSABLE                    0x02
66
67 #define DROP_RELS_BSEARCH_THRESHOLD             20
68
69 /* GUC variables */
70 bool            zero_damaged_pages = false;
71 int                     bgwriter_lru_maxpages = 100;
72 double          bgwriter_lru_multiplier = 2.0;
73 bool            track_io_timing = false;
74
75 /*
76  * How many buffers PrefetchBuffer callers should try to stay ahead of their
77  * ReadBuffer calls by.  This is maintained by the assign hook for
78  * effective_io_concurrency.  Zero means "never prefetch".
79  */
80 int                     target_prefetch_pages = 0;
81
82 /* local state for StartBufferIO and related functions */
83 static volatile BufferDesc *InProgressBuf = NULL;
84 static bool IsForInput;
85
86 /* local state for LockBufferForCleanup */
87 static volatile BufferDesc *PinCountWaitBuf = NULL;
88
89
90 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
91                                   ForkNumber forkNum, BlockNumber blockNum,
92                                   ReadBufferMode mode, BufferAccessStrategy strategy,
93                                   bool *hit);
94 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
95 static void PinBuffer_Locked(volatile BufferDesc *buf);
96 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
97 static void BufferSync(int flags);
98 static int      SyncOneBuffer(int buf_id, bool skip_recently_used);
99 static void WaitIO(volatile BufferDesc *buf);
100 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
101 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
102                                   int set_flag_bits);
103 static void shared_buffer_write_error_callback(void *arg);
104 static void local_buffer_write_error_callback(void *arg);
105 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
106                         char relpersistence,
107                         ForkNumber forkNum,
108                         BlockNumber blockNum,
109                         BufferAccessStrategy strategy,
110                         bool *foundPtr);
111 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
112 static void AtProcExit_Buffers(int code, Datum arg);
113 static int      rnode_comparator(const void *p1, const void *p2);
114
115
116 /*
117  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
118  *
119  * This is named by analogy to ReadBuffer but doesn't actually allocate a
120  * buffer.      Instead it tries to ensure that a future ReadBuffer for the given
121  * block will not be delayed by the I/O.  Prefetching is optional.
122  * No-op if prefetching isn't compiled in.
123  */
124 void
125 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
126 {
127 #ifdef USE_PREFETCH
128         Assert(RelationIsValid(reln));
129         Assert(BlockNumberIsValid(blockNum));
130
131         /* Open it at the smgr level if not already done */
132         RelationOpenSmgr(reln);
133
134         if (RelationUsesLocalBuffers(reln))
135         {
136                 /* see comments in ReadBufferExtended */
137                 if (RELATION_IS_OTHER_TEMP(reln))
138                         ereport(ERROR,
139                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
140                                 errmsg("cannot access temporary tables of other sessions")));
141
142                 /* pass it off to localbuf.c */
143                 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
144         }
145         else
146         {
147                 BufferTag       newTag;         /* identity of requested block */
148                 uint32          newHash;        /* hash value for newTag */
149                 LWLockId        newPartitionLock;       /* buffer partition lock for it */
150                 int                     buf_id;
151
152                 /* create a tag so we can lookup the buffer */
153                 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
154                                            forkNum, blockNum);
155
156                 /* determine its hash code and partition lock ID */
157                 newHash = BufTableHashCode(&newTag);
158                 newPartitionLock = BufMappingPartitionLock(newHash);
159
160                 /* see if the block is in the buffer pool already */
161                 LWLockAcquire(newPartitionLock, LW_SHARED);
162                 buf_id = BufTableLookup(&newTag, newHash);
163                 LWLockRelease(newPartitionLock);
164
165                 /* If not in buffers, initiate prefetch */
166                 if (buf_id < 0)
167                         smgrprefetch(reln->rd_smgr, forkNum, blockNum);
168
169                 /*
170                  * If the block *is* in buffers, we do nothing.  This is not really
171                  * ideal: the block might be just about to be evicted, which would be
172                  * stupid since we know we are going to need it soon.  But the only
173                  * easy answer is to bump the usage_count, which does not seem like a
174                  * great solution: when the caller does ultimately touch the block,
175                  * usage_count would get bumped again, resulting in too much
176                  * favoritism for blocks that are involved in a prefetch sequence. A
177                  * real fix would involve some additional per-buffer state, and it's
178                  * not clear that there's enough of a problem to justify that.
179                  */
180         }
181 #endif   /* USE_PREFETCH */
182 }
183
184
185 /*
186  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
187  *              fork with RBM_NORMAL mode and default strategy.
188  */
189 Buffer
190 ReadBuffer(Relation reln, BlockNumber blockNum)
191 {
192         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
193 }
194
195 /*
196  * ReadBufferExtended -- returns a buffer containing the requested
197  *              block of the requested relation.  If the blknum
198  *              requested is P_NEW, extend the relation file and
199  *              allocate a new block.  (Caller is responsible for
200  *              ensuring that only one backend tries to extend a
201  *              relation at the same time!)
202  *
203  * Returns: the buffer number for the buffer containing
204  *              the block read.  The returned buffer has been pinned.
205  *              Does not return on error --- elog's instead.
206  *
207  * Assume when this function is called, that reln has been opened already.
208  *
209  * In RBM_NORMAL mode, the page is read from disk, and the page header is
210  * validated. An error is thrown if the page header is not valid.
211  *
212  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
213  * valid, the page is zeroed instead of throwing an error. This is intended
214  * for non-critical data, where the caller is prepared to repair errors.
215  *
216  * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
217  * with zeros instead of reading it from disk.  Useful when the caller is
218  * going to fill the page from scratch, since this saves I/O and avoids
219  * unnecessary failure if the page-on-disk has corrupt page headers.
220  * Caution: do not use this mode to read a page that is beyond the relation's
221  * current physical EOF; that is likely to cause problems in md.c when
222  * the page is modified and written out. P_NEW is OK, though.
223  *
224  * If strategy is not NULL, a nondefault buffer access strategy is used.
225  * See buffer/README for details.
226  */
227 Buffer
228 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
229                                    ReadBufferMode mode, BufferAccessStrategy strategy)
230 {
231         bool            hit;
232         Buffer          buf;
233
234         /* Open it at the smgr level if not already done */
235         RelationOpenSmgr(reln);
236
237         /*
238          * Reject attempts to read non-local temporary relations; we would be
239          * likely to get wrong data since we have no visibility into the owning
240          * session's local buffers.
241          */
242         if (RELATION_IS_OTHER_TEMP(reln))
243                 ereport(ERROR,
244                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
245                                  errmsg("cannot access temporary tables of other sessions")));
246
247         /*
248          * Read the buffer, and update pgstat counters to reflect a cache hit or
249          * miss.
250          */
251         pgstat_count_buffer_read(reln);
252         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
253                                                         forkNum, blockNum, mode, strategy, &hit);
254         if (hit)
255                 pgstat_count_buffer_hit(reln);
256         return buf;
257 }
258
259
260 /*
261  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
262  *              a relcache entry for the relation.
263  *
264  * NB: At present, this function may only be used on permanent relations, which
265  * is OK, because we only use it during XLOG replay.  If in the future we
266  * want to use it on temporary or unlogged relations, we could pass additional
267  * parameters.
268  */
269 Buffer
270 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
271                                                   BlockNumber blockNum, ReadBufferMode mode,
272                                                   BufferAccessStrategy strategy)
273 {
274         bool            hit;
275
276         SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
277
278         Assert(InRecovery);
279
280         return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
281                                                          mode, strategy, &hit);
282 }
283
284
285 /*
286  * ReadBuffer_common -- common logic for all ReadBuffer variants
287  *
288  * *hit is set to true if the request was satisfied from shared buffer cache.
289  */
290 static Buffer
291 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
292                                   BlockNumber blockNum, ReadBufferMode mode,
293                                   BufferAccessStrategy strategy, bool *hit)
294 {
295         volatile BufferDesc *bufHdr;
296         Block           bufBlock;
297         bool            found;
298         bool            isExtend;
299         bool            isLocalBuf = SmgrIsTemp(smgr);
300
301         *hit = false;
302
303         /* Make sure we will have room to remember the buffer pin */
304         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
305
306         isExtend = (blockNum == P_NEW);
307
308         TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
309                                                                            smgr->smgr_rnode.node.spcNode,
310                                                                            smgr->smgr_rnode.node.dbNode,
311                                                                            smgr->smgr_rnode.node.relNode,
312                                                                            smgr->smgr_rnode.backend,
313                                                                            isExtend);
314
315         /* Substitute proper block number if caller asked for P_NEW */
316         if (isExtend)
317                 blockNum = smgrnblocks(smgr, forkNum);
318
319         if (isLocalBuf)
320         {
321                 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
322                 if (found)
323                         pgBufferUsage.local_blks_hit++;
324                 else
325                         pgBufferUsage.local_blks_read++;
326         }
327         else
328         {
329                 /*
330                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
331                  * not currently in memory.
332                  */
333                 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
334                                                          strategy, &found);
335                 if (found)
336                         pgBufferUsage.shared_blks_hit++;
337                 else
338                         pgBufferUsage.shared_blks_read++;
339         }
340
341         /* At this point we do NOT hold any locks. */
342
343         /* if it was already in the buffer pool, we're done */
344         if (found)
345         {
346                 if (!isExtend)
347                 {
348                         /* Just need to update stats before we exit */
349                         *hit = true;
350                         VacuumPageHit++;
351
352                         if (VacuumCostActive)
353                                 VacuumCostBalance += VacuumCostPageHit;
354
355                         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
356                                                                                           smgr->smgr_rnode.node.spcNode,
357                                                                                           smgr->smgr_rnode.node.dbNode,
358                                                                                           smgr->smgr_rnode.node.relNode,
359                                                                                           smgr->smgr_rnode.backend,
360                                                                                           isExtend,
361                                                                                           found);
362
363                         return BufferDescriptorGetBuffer(bufHdr);
364                 }
365
366                 /*
367                  * We get here only in the corner case where we are trying to extend
368                  * the relation but we found a pre-existing buffer marked BM_VALID.
369                  * This can happen because mdread doesn't complain about reads beyond
370                  * EOF (when zero_damaged_pages is ON) and so a previous attempt to
371                  * read a block beyond EOF could have left a "valid" zero-filled
372                  * buffer.      Unfortunately, we have also seen this case occurring
373                  * because of buggy Linux kernels that sometimes return an
374                  * lseek(SEEK_END) result that doesn't account for a recent write. In
375                  * that situation, the pre-existing buffer would contain valid data
376                  * that we don't want to overwrite.  Since the legitimate case should
377                  * always have left a zero-filled buffer, complain if not PageIsNew.
378                  */
379                 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
380                 if (!PageIsNew((Page) bufBlock))
381                         ereport(ERROR,
382                          (errmsg("unexpected data beyond EOF in block %u of relation %s",
383                                          blockNum, relpath(smgr->smgr_rnode, forkNum)),
384                           errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
385
386                 /*
387                  * We *must* do smgrextend before succeeding, else the page will not
388                  * be reserved by the kernel, and the next P_NEW call will decide to
389                  * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
390                  * call that BufferAlloc didn't, and proceed.
391                  */
392                 if (isLocalBuf)
393                 {
394                         /* Only need to adjust flags */
395                         Assert(bufHdr->flags & BM_VALID);
396                         bufHdr->flags &= ~BM_VALID;
397                 }
398                 else
399                 {
400                         /*
401                          * Loop to handle the very small possibility that someone re-sets
402                          * BM_VALID between our clearing it and StartBufferIO inspecting
403                          * it.
404                          */
405                         do
406                         {
407                                 LockBufHdr(bufHdr);
408                                 Assert(bufHdr->flags & BM_VALID);
409                                 bufHdr->flags &= ~BM_VALID;
410                                 UnlockBufHdr(bufHdr);
411                         } while (!StartBufferIO(bufHdr, true));
412                 }
413         }
414
415         /*
416          * if we have gotten to this point, we have allocated a buffer for the
417          * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
418          * if it's a shared buffer.
419          *
420          * Note: if smgrextend fails, we will end up with a buffer that is
421          * allocated but not marked BM_VALID.  P_NEW will still select the same
422          * block number (because the relation didn't get any longer on disk) and
423          * so future attempts to extend the relation will find the same buffer (if
424          * it's not been recycled) but come right back here to try smgrextend
425          * again.
426          */
427         Assert(!(bufHdr->flags & BM_VALID));            /* spinlock not needed */
428
429         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
430
431         if (isExtend)
432         {
433                 /* new buffers are zero-filled */
434                 MemSet((char *) bufBlock, 0, BLCKSZ);
435                 /* don't set checksum for all-zero page */
436                 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
437         }
438         else
439         {
440                 /*
441                  * Read in the page, unless the caller intends to overwrite it and
442                  * just wants us to allocate a buffer.
443                  */
444                 if (mode == RBM_ZERO)
445                         MemSet((char *) bufBlock, 0, BLCKSZ);
446                 else
447                 {
448                         instr_time      io_start,
449                                                 io_time;
450
451                         if (track_io_timing)
452                                 INSTR_TIME_SET_CURRENT(io_start);
453
454                         smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
455
456                         if (track_io_timing)
457                         {
458                                 INSTR_TIME_SET_CURRENT(io_time);
459                                 INSTR_TIME_SUBTRACT(io_time, io_start);
460                                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
461                                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
462                         }
463
464                         /* check for garbage data */
465                         if (!PageIsVerified((Page) bufBlock, blockNum))
466                         {
467                                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
468                                 {
469                                         ereport(WARNING,
470                                                         (errcode(ERRCODE_DATA_CORRUPTED),
471                                                          errmsg("invalid page in block %u of relation %s; zeroing out page",
472                                                                         blockNum,
473                                                                         relpath(smgr->smgr_rnode, forkNum))));
474                                         MemSet((char *) bufBlock, 0, BLCKSZ);
475                                 }
476                                 else
477                                         ereport(ERROR,
478                                                         (errcode(ERRCODE_DATA_CORRUPTED),
479                                                          errmsg("invalid page in block %u of relation %s",
480                                                                         blockNum,
481                                                                         relpath(smgr->smgr_rnode, forkNum))));
482                         }
483                 }
484         }
485
486         if (isLocalBuf)
487         {
488                 /* Only need to adjust flags */
489                 bufHdr->flags |= BM_VALID;
490         }
491         else
492         {
493                 /* Set BM_VALID, terminate IO, and wake up any waiters */
494                 TerminateBufferIO(bufHdr, false, BM_VALID);
495         }
496
497         VacuumPageMiss++;
498         if (VacuumCostActive)
499                 VacuumCostBalance += VacuumCostPageMiss;
500
501         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
502                                                                           smgr->smgr_rnode.node.spcNode,
503                                                                           smgr->smgr_rnode.node.dbNode,
504                                                                           smgr->smgr_rnode.node.relNode,
505                                                                           smgr->smgr_rnode.backend,
506                                                                           isExtend,
507                                                                           found);
508
509         return BufferDescriptorGetBuffer(bufHdr);
510 }
511
512 /*
513  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
514  *              buffer.  If no buffer exists already, selects a replacement
515  *              victim and evicts the old page, but does NOT read in new page.
516  *
517  * "strategy" can be a buffer replacement strategy object, or NULL for
518  * the default strategy.  The selected buffer's usage_count is advanced when
519  * using the default strategy, but otherwise possibly not (see PinBuffer).
520  *
521  * The returned buffer is pinned and is already marked as holding the
522  * desired page.  If it already did have the desired page, *foundPtr is
523  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
524  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
525  *
526  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
527  * we keep it for simplicity in ReadBuffer.
528  *
529  * No locks are held either at entry or exit.
530  */
531 static volatile BufferDesc *
532 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
533                         BlockNumber blockNum,
534                         BufferAccessStrategy strategy,
535                         bool *foundPtr)
536 {
537         BufferTag       newTag;                 /* identity of requested block */
538         uint32          newHash;                /* hash value for newTag */
539         LWLockId        newPartitionLock;               /* buffer partition lock for it */
540         BufferTag       oldTag;                 /* previous identity of selected buffer */
541         uint32          oldHash;                /* hash value for oldTag */
542         LWLockId        oldPartitionLock;               /* buffer partition lock for it */
543         BufFlags        oldFlags;
544         int                     buf_id;
545         volatile BufferDesc *buf;
546         bool            valid;
547
548         /* create a tag so we can lookup the buffer */
549         INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
550
551         /* determine its hash code and partition lock ID */
552         newHash = BufTableHashCode(&newTag);
553         newPartitionLock = BufMappingPartitionLock(newHash);
554
555         /* see if the block is in the buffer pool already */
556         LWLockAcquire(newPartitionLock, LW_SHARED);
557         buf_id = BufTableLookup(&newTag, newHash);
558         if (buf_id >= 0)
559         {
560                 /*
561                  * Found it.  Now, pin the buffer so no one can steal it from the
562                  * buffer pool, and check to see if the correct data has been loaded
563                  * into the buffer.
564                  */
565                 buf = &BufferDescriptors[buf_id];
566
567                 valid = PinBuffer(buf, strategy);
568
569                 /* Can release the mapping lock as soon as we've pinned it */
570                 LWLockRelease(newPartitionLock);
571
572                 *foundPtr = TRUE;
573
574                 if (!valid)
575                 {
576                         /*
577                          * We can only get here if (a) someone else is still reading in
578                          * the page, or (b) a previous read attempt failed.  We have to
579                          * wait for any active read attempt to finish, and then set up our
580                          * own read attempt if the page is still not BM_VALID.
581                          * StartBufferIO does it all.
582                          */
583                         if (StartBufferIO(buf, true))
584                         {
585                                 /*
586                                  * If we get here, previous attempts to read the buffer must
587                                  * have failed ... but we shall bravely try again.
588                                  */
589                                 *foundPtr = FALSE;
590                         }
591                 }
592
593                 return buf;
594         }
595
596         /*
597          * Didn't find it in the buffer pool.  We'll have to initialize a new
598          * buffer.      Remember to unlock the mapping lock while doing the work.
599          */
600         LWLockRelease(newPartitionLock);
601
602         /* Loop here in case we have to try another victim buffer */
603         for (;;)
604         {
605                 bool            lock_held;
606
607                 /*
608                  * Select a victim buffer.      The buffer is returned with its header
609                  * spinlock still held!  Also (in most cases) the BufFreelistLock is
610                  * still held, since it would be bad to hold the spinlock while
611                  * possibly waking up other processes.
612                  */
613                 buf = StrategyGetBuffer(strategy, &lock_held);
614
615                 Assert(buf->refcount == 0);
616
617                 /* Must copy buffer flags while we still hold the spinlock */
618                 oldFlags = buf->flags;
619
620                 /* Pin the buffer and then release the buffer spinlock */
621                 PinBuffer_Locked(buf);
622
623                 /* Now it's safe to release the freelist lock */
624                 if (lock_held)
625                         LWLockRelease(BufFreelistLock);
626
627                 /*
628                  * If the buffer was dirty, try to write it out.  There is a race
629                  * condition here, in that someone might dirty it after we released it
630                  * above, or even while we are writing it out (since our share-lock
631                  * won't prevent hint-bit updates).  We will recheck the dirty bit
632                  * after re-locking the buffer header.
633                  */
634                 if (oldFlags & BM_DIRTY)
635                 {
636                         /*
637                          * We need a share-lock on the buffer contents to write it out
638                          * (else we might write invalid data, eg because someone else is
639                          * compacting the page contents while we write).  We must use a
640                          * conditional lock acquisition here to avoid deadlock.  Even
641                          * though the buffer was not pinned (and therefore surely not
642                          * locked) when StrategyGetBuffer returned it, someone else could
643                          * have pinned and exclusive-locked it by the time we get here. If
644                          * we try to get the lock unconditionally, we'd block waiting for
645                          * them; if they later block waiting for us, deadlock ensues.
646                          * (This has been observed to happen when two backends are both
647                          * trying to split btree index pages, and the second one just
648                          * happens to be trying to split the page the first one got from
649                          * StrategyGetBuffer.)
650                          */
651                         if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
652                         {
653                                 /*
654                                  * If using a nondefault strategy, and writing the buffer
655                                  * would require a WAL flush, let the strategy decide whether
656                                  * to go ahead and write/reuse the buffer or to choose another
657                                  * victim.      We need lock to inspect the page LSN, so this
658                                  * can't be done inside StrategyGetBuffer.
659                                  */
660                                 if (strategy != NULL)
661                                 {
662                                         XLogRecPtr      lsn;
663
664                                         /* Read the LSN while holding buffer header lock */
665                                         LockBufHdr(buf);
666                                         lsn = BufferGetLSN(buf);
667                                         UnlockBufHdr(buf);
668
669                                         if (XLogNeedsFlush(lsn) &&
670                                                 StrategyRejectBuffer(strategy, buf))
671                                         {
672                                                 /* Drop lock/pin and loop around for another buffer */
673                                                 LWLockRelease(buf->content_lock);
674                                                 UnpinBuffer(buf, true);
675                                                 continue;
676                                         }
677                                 }
678
679                                 /* OK, do the I/O */
680                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
681                                                                                            smgr->smgr_rnode.node.spcNode,
682                                                                                                 smgr->smgr_rnode.node.dbNode,
683                                                                                           smgr->smgr_rnode.node.relNode);
684
685                                 FlushBuffer(buf, NULL);
686                                 LWLockRelease(buf->content_lock);
687
688                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
689                                                                                            smgr->smgr_rnode.node.spcNode,
690                                                                                                 smgr->smgr_rnode.node.dbNode,
691                                                                                           smgr->smgr_rnode.node.relNode);
692                         }
693                         else
694                         {
695                                 /*
696                                  * Someone else has locked the buffer, so give it up and loop
697                                  * back to get another one.
698                                  */
699                                 UnpinBuffer(buf, true);
700                                 continue;
701                         }
702                 }
703
704                 /*
705                  * To change the association of a valid buffer, we'll need to have
706                  * exclusive lock on both the old and new mapping partitions.
707                  */
708                 if (oldFlags & BM_TAG_VALID)
709                 {
710                         /*
711                          * Need to compute the old tag's hashcode and partition lock ID.
712                          * XXX is it worth storing the hashcode in BufferDesc so we need
713                          * not recompute it here?  Probably not.
714                          */
715                         oldTag = buf->tag;
716                         oldHash = BufTableHashCode(&oldTag);
717                         oldPartitionLock = BufMappingPartitionLock(oldHash);
718
719                         /*
720                          * Must lock the lower-numbered partition first to avoid
721                          * deadlocks.
722                          */
723                         if (oldPartitionLock < newPartitionLock)
724                         {
725                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
726                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
727                         }
728                         else if (oldPartitionLock > newPartitionLock)
729                         {
730                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
731                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
732                         }
733                         else
734                         {
735                                 /* only one partition, only one lock */
736                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
737                         }
738                 }
739                 else
740                 {
741                         /* if it wasn't valid, we need only the new partition */
742                         LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
743                         /* these just keep the compiler quiet about uninit variables */
744                         oldHash = 0;
745                         oldPartitionLock = 0;
746                 }
747
748                 /*
749                  * Try to make a hashtable entry for the buffer under its new tag.
750                  * This could fail because while we were writing someone else
751                  * allocated another buffer for the same block we want to read in.
752                  * Note that we have not yet removed the hashtable entry for the old
753                  * tag.
754                  */
755                 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
756
757                 if (buf_id >= 0)
758                 {
759                         /*
760                          * Got a collision. Someone has already done what we were about to
761                          * do. We'll just handle this as if it were found in the buffer
762                          * pool in the first place.  First, give up the buffer we were
763                          * planning to use.
764                          */
765                         UnpinBuffer(buf, true);
766
767                         /* Can give up that buffer's mapping partition lock now */
768                         if ((oldFlags & BM_TAG_VALID) &&
769                                 oldPartitionLock != newPartitionLock)
770                                 LWLockRelease(oldPartitionLock);
771
772                         /* remaining code should match code at top of routine */
773
774                         buf = &BufferDescriptors[buf_id];
775
776                         valid = PinBuffer(buf, strategy);
777
778                         /* Can release the mapping lock as soon as we've pinned it */
779                         LWLockRelease(newPartitionLock);
780
781                         *foundPtr = TRUE;
782
783                         if (!valid)
784                         {
785                                 /*
786                                  * We can only get here if (a) someone else is still reading
787                                  * in the page, or (b) a previous read attempt failed.  We
788                                  * have to wait for any active read attempt to finish, and
789                                  * then set up our own read attempt if the page is still not
790                                  * BM_VALID.  StartBufferIO does it all.
791                                  */
792                                 if (StartBufferIO(buf, true))
793                                 {
794                                         /*
795                                          * If we get here, previous attempts to read the buffer
796                                          * must have failed ... but we shall bravely try again.
797                                          */
798                                         *foundPtr = FALSE;
799                                 }
800                         }
801
802                         return buf;
803                 }
804
805                 /*
806                  * Need to lock the buffer header too in order to change its tag.
807                  */
808                 LockBufHdr(buf);
809
810                 /*
811                  * Somebody could have pinned or re-dirtied the buffer while we were
812                  * doing the I/O and making the new hashtable entry.  If so, we can't
813                  * recycle this buffer; we must undo everything we've done and start
814                  * over with a new victim buffer.
815                  */
816                 oldFlags = buf->flags;
817                 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
818                         break;
819
820                 UnlockBufHdr(buf);
821                 BufTableDelete(&newTag, newHash);
822                 if ((oldFlags & BM_TAG_VALID) &&
823                         oldPartitionLock != newPartitionLock)
824                         LWLockRelease(oldPartitionLock);
825                 LWLockRelease(newPartitionLock);
826                 UnpinBuffer(buf, true);
827         }
828
829         /*
830          * Okay, it's finally safe to rename the buffer.
831          *
832          * Clearing BM_VALID here is necessary, clearing the dirtybits is just
833          * paranoia.  We also reset the usage_count since any recency of use of
834          * the old content is no longer relevant.  (The usage_count starts out at
835          * 1 so that the buffer can survive one clock-sweep pass.)
836          */
837         buf->tag = newTag;
838         buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
839         if (relpersistence == RELPERSISTENCE_PERMANENT)
840                 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
841         else
842                 buf->flags |= BM_TAG_VALID;
843         buf->usage_count = 1;
844
845         UnlockBufHdr(buf);
846
847         if (oldFlags & BM_TAG_VALID)
848         {
849                 BufTableDelete(&oldTag, oldHash);
850                 if (oldPartitionLock != newPartitionLock)
851                         LWLockRelease(oldPartitionLock);
852         }
853
854         LWLockRelease(newPartitionLock);
855
856         /*
857          * Buffer contents are currently invalid.  Try to get the io_in_progress
858          * lock.  If StartBufferIO returns false, then someone else managed to
859          * read it before we did, so there's nothing left for BufferAlloc() to do.
860          */
861         if (StartBufferIO(buf, true))
862                 *foundPtr = FALSE;
863         else
864                 *foundPtr = TRUE;
865
866         return buf;
867 }
868
869 /*
870  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
871  * freelist.
872  *
873  * The buffer header spinlock must be held at entry.  We drop it before
874  * returning.  (This is sane because the caller must have locked the
875  * buffer in order to be sure it should be dropped.)
876  *
877  * This is used only in contexts such as dropping a relation.  We assume
878  * that no other backend could possibly be interested in using the page,
879  * so the only reason the buffer might be pinned is if someone else is
880  * trying to write it out.      We have to let them finish before we can
881  * reclaim the buffer.
882  *
883  * The buffer could get reclaimed by someone else while we are waiting
884  * to acquire the necessary locks; if so, don't mess it up.
885  */
886 static void
887 InvalidateBuffer(volatile BufferDesc *buf)
888 {
889         BufferTag       oldTag;
890         uint32          oldHash;                /* hash value for oldTag */
891         LWLockId        oldPartitionLock;               /* buffer partition lock for it */
892         BufFlags        oldFlags;
893
894         /* Save the original buffer tag before dropping the spinlock */
895         oldTag = buf->tag;
896
897         UnlockBufHdr(buf);
898
899         /*
900          * Need to compute the old tag's hashcode and partition lock ID. XXX is it
901          * worth storing the hashcode in BufferDesc so we need not recompute it
902          * here?  Probably not.
903          */
904         oldHash = BufTableHashCode(&oldTag);
905         oldPartitionLock = BufMappingPartitionLock(oldHash);
906
907 retry:
908
909         /*
910          * Acquire exclusive mapping lock in preparation for changing the buffer's
911          * association.
912          */
913         LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
914
915         /* Re-lock the buffer header */
916         LockBufHdr(buf);
917
918         /* If it's changed while we were waiting for lock, do nothing */
919         if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
920         {
921                 UnlockBufHdr(buf);
922                 LWLockRelease(oldPartitionLock);
923                 return;
924         }
925
926         /*
927          * We assume the only reason for it to be pinned is that someone else is
928          * flushing the page out.  Wait for them to finish.  (This could be an
929          * infinite loop if the refcount is messed up... it would be nice to time
930          * out after awhile, but there seems no way to be sure how many loops may
931          * be needed.  Note that if the other guy has pinned the buffer but not
932          * yet done StartBufferIO, WaitIO will fall through and we'll effectively
933          * be busy-looping here.)
934          */
935         if (buf->refcount != 0)
936         {
937                 UnlockBufHdr(buf);
938                 LWLockRelease(oldPartitionLock);
939                 /* safety check: should definitely not be our *own* pin */
940                 if (PrivateRefCount[buf->buf_id] != 0)
941                         elog(ERROR, "buffer is pinned in InvalidateBuffer");
942                 WaitIO(buf);
943                 goto retry;
944         }
945
946         /*
947          * Clear out the buffer's tag and flags.  We must do this to ensure that
948          * linear scans of the buffer array don't think the buffer is valid.
949          */
950         oldFlags = buf->flags;
951         CLEAR_BUFFERTAG(buf->tag);
952         buf->flags = 0;
953         buf->usage_count = 0;
954
955         UnlockBufHdr(buf);
956
957         /*
958          * Remove the buffer from the lookup hashtable, if it was in there.
959          */
960         if (oldFlags & BM_TAG_VALID)
961                 BufTableDelete(&oldTag, oldHash);
962
963         /*
964          * Done with mapping lock.
965          */
966         LWLockRelease(oldPartitionLock);
967
968         /*
969          * Insert the buffer at the head of the list of free buffers.
970          */
971         StrategyFreeBuffer(buf);
972 }
973
974 /*
975  * MarkBufferDirty
976  *
977  *              Marks buffer contents as dirty (actual write happens later).
978  *
979  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
980  * exclusive lock, then somebody could be in process of writing the buffer,
981  * leading to risk of bad data written to disk.)
982  */
983 void
984 MarkBufferDirty(Buffer buffer)
985 {
986         volatile BufferDesc *bufHdr;
987
988         if (!BufferIsValid(buffer))
989                 elog(ERROR, "bad buffer ID: %d", buffer);
990
991         if (BufferIsLocal(buffer))
992         {
993                 MarkLocalBufferDirty(buffer);
994                 return;
995         }
996
997         bufHdr = &BufferDescriptors[buffer - 1];
998
999         Assert(PrivateRefCount[buffer - 1] > 0);
1000         /* unfortunately we can't check if the lock is held exclusively */
1001         Assert(LWLockHeldByMe(bufHdr->content_lock));
1002
1003         LockBufHdr(bufHdr);
1004
1005         Assert(bufHdr->refcount > 0);
1006
1007         /*
1008          * If the buffer was not dirty already, do vacuum accounting.
1009          */
1010         if (!(bufHdr->flags & BM_DIRTY))
1011         {
1012                 VacuumPageDirty++;
1013                 pgBufferUsage.shared_blks_dirtied++;
1014                 if (VacuumCostActive)
1015                         VacuumCostBalance += VacuumCostPageDirty;
1016         }
1017
1018         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1019
1020         UnlockBufHdr(bufHdr);
1021 }
1022
1023 /*
1024  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1025  *
1026  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1027  * compared to calling the two routines separately.  Now it's mainly just
1028  * a convenience function.      However, if the passed buffer is valid and
1029  * already contains the desired block, we just return it as-is; and that
1030  * does save considerable work compared to a full release and reacquire.
1031  *
1032  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1033  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1034  * but can save some tests in the caller.
1035  */
1036 Buffer
1037 ReleaseAndReadBuffer(Buffer buffer,
1038                                          Relation relation,
1039                                          BlockNumber blockNum)
1040 {
1041         ForkNumber      forkNum = MAIN_FORKNUM;
1042         volatile BufferDesc *bufHdr;
1043
1044         if (BufferIsValid(buffer))
1045         {
1046                 if (BufferIsLocal(buffer))
1047                 {
1048                         Assert(LocalRefCount[-buffer - 1] > 0);
1049                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
1050                         if (bufHdr->tag.blockNum == blockNum &&
1051                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1052                                 bufHdr->tag.forkNum == forkNum)
1053                                 return buffer;
1054                         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1055                         LocalRefCount[-buffer - 1]--;
1056                 }
1057                 else
1058                 {
1059                         Assert(PrivateRefCount[buffer - 1] > 0);
1060                         bufHdr = &BufferDescriptors[buffer - 1];
1061                         /* we have pin, so it's ok to examine tag without spinlock */
1062                         if (bufHdr->tag.blockNum == blockNum &&
1063                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1064                                 bufHdr->tag.forkNum == forkNum)
1065                                 return buffer;
1066                         UnpinBuffer(bufHdr, true);
1067                 }
1068         }
1069
1070         return ReadBuffer(relation, blockNum);
1071 }
1072
1073 /*
1074  * PinBuffer -- make buffer unavailable for replacement.
1075  *
1076  * For the default access strategy, the buffer's usage_count is incremented
1077  * when we first pin it; for other strategies we just make sure the usage_count
1078  * isn't zero.  (The idea of the latter is that we don't want synchronized
1079  * heap scans to inflate the count, but we need it to not be zero to discourage
1080  * other backends from stealing buffers from our ring.  As long as we cycle
1081  * through the ring faster than the global clock-sweep cycles, buffers in
1082  * our ring won't be chosen as victims for replacement by other backends.)
1083  *
1084  * This should be applied only to shared buffers, never local ones.
1085  *
1086  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1087  *
1088  * Returns TRUE if buffer is BM_VALID, else FALSE.      This provision allows
1089  * some callers to avoid an extra spinlock cycle.
1090  */
1091 static bool
1092 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
1093 {
1094         int                     b = buf->buf_id;
1095         bool            result;
1096
1097         if (PrivateRefCount[b] == 0)
1098         {
1099                 LockBufHdr(buf);
1100                 buf->refcount++;
1101                 if (strategy == NULL)
1102                 {
1103                         if (buf->usage_count < BM_MAX_USAGE_COUNT)
1104                                 buf->usage_count++;
1105                 }
1106                 else
1107                 {
1108                         if (buf->usage_count == 0)
1109                                 buf->usage_count = 1;
1110                 }
1111                 result = (buf->flags & BM_VALID) != 0;
1112                 UnlockBufHdr(buf);
1113         }
1114         else
1115         {
1116                 /* If we previously pinned the buffer, it must surely be valid */
1117                 result = true;
1118         }
1119         PrivateRefCount[b]++;
1120         Assert(PrivateRefCount[b] > 0);
1121         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1122                                                                 BufferDescriptorGetBuffer(buf));
1123         return result;
1124 }
1125
1126 /*
1127  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1128  * The spinlock is released before return.
1129  *
1130  * Currently, no callers of this function want to modify the buffer's
1131  * usage_count at all, so there's no need for a strategy parameter.
1132  * Also we don't bother with a BM_VALID test (the caller could check that for
1133  * itself).
1134  *
1135  * Note: use of this routine is frequently mandatory, not just an optimization
1136  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1137  * its state can change under us.
1138  */
1139 static void
1140 PinBuffer_Locked(volatile BufferDesc *buf)
1141 {
1142         int                     b = buf->buf_id;
1143
1144         if (PrivateRefCount[b] == 0)
1145                 buf->refcount++;
1146         UnlockBufHdr(buf);
1147         PrivateRefCount[b]++;
1148         Assert(PrivateRefCount[b] > 0);
1149         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1150                                                                 BufferDescriptorGetBuffer(buf));
1151 }
1152
1153 /*
1154  * UnpinBuffer -- make buffer available for replacement.
1155  *
1156  * This should be applied only to shared buffers, never local ones.
1157  *
1158  * Most but not all callers want CurrentResourceOwner to be adjusted.
1159  * Those that don't should pass fixOwner = FALSE.
1160  */
1161 static void
1162 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
1163 {
1164         int                     b = buf->buf_id;
1165
1166         if (fixOwner)
1167                 ResourceOwnerForgetBuffer(CurrentResourceOwner,
1168                                                                   BufferDescriptorGetBuffer(buf));
1169
1170         Assert(PrivateRefCount[b] > 0);
1171         PrivateRefCount[b]--;
1172         if (PrivateRefCount[b] == 0)
1173         {
1174                 /* I'd better not still hold any locks on the buffer */
1175                 Assert(!LWLockHeldByMe(buf->content_lock));
1176                 Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
1177
1178                 LockBufHdr(buf);
1179
1180                 /* Decrement the shared reference count */
1181                 Assert(buf->refcount > 0);
1182                 buf->refcount--;
1183
1184                 /* Support LockBufferForCleanup() */
1185                 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
1186                         buf->refcount == 1)
1187                 {
1188                         /* we just released the last pin other than the waiter's */
1189                         int                     wait_backend_pid = buf->wait_backend_pid;
1190
1191                         buf->flags &= ~BM_PIN_COUNT_WAITER;
1192                         UnlockBufHdr(buf);
1193                         ProcSendSignal(wait_backend_pid);
1194                 }
1195                 else
1196                         UnlockBufHdr(buf);
1197         }
1198 }
1199
1200 /*
1201  * BufferSync -- Write out all dirty buffers in the pool.
1202  *
1203  * This is called at checkpoint time to write out all dirty shared buffers.
1204  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1205  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN is
1206  * set, we write even unlogged buffers, which are otherwise skipped.  The
1207  * remaining flags currently have no effect here.
1208  */
1209 static void
1210 BufferSync(int flags)
1211 {
1212         int                     buf_id;
1213         int                     num_to_scan;
1214         int                     num_to_write;
1215         int                     num_written;
1216         int                     mask = BM_DIRTY;
1217
1218         /* Make sure we can handle the pin inside SyncOneBuffer */
1219         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1220
1221         /*
1222          * Unless this is a shutdown checkpoint, we write only permanent, dirty
1223          * buffers.  But at shutdown or end of recovery, we write all dirty
1224          * buffers.
1225          */
1226         if (!((flags & CHECKPOINT_IS_SHUTDOWN) || (flags & CHECKPOINT_END_OF_RECOVERY)))
1227                 mask |= BM_PERMANENT;
1228
1229         /*
1230          * Loop over all buffers, and mark the ones that need to be written with
1231          * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
1232          * can estimate how much work needs to be done.
1233          *
1234          * This allows us to write only those pages that were dirty when the
1235          * checkpoint began, and not those that get dirtied while it proceeds.
1236          * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1237          * later in this function, or by normal backends or the bgwriter cleaning
1238          * scan, the flag is cleared.  Any buffer dirtied after this point won't
1239          * have the flag set.
1240          *
1241          * Note that if we fail to write some buffer, we may leave buffers with
1242          * BM_CHECKPOINT_NEEDED still set.      This is OK since any such buffer would
1243          * certainly need to be written for the next checkpoint attempt, too.
1244          */
1245         num_to_write = 0;
1246         for (buf_id = 0; buf_id < NBuffers; buf_id++)
1247         {
1248                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1249
1250                 /*
1251                  * Header spinlock is enough to examine BM_DIRTY, see comment in
1252                  * SyncOneBuffer.
1253                  */
1254                 LockBufHdr(bufHdr);
1255
1256                 if ((bufHdr->flags & mask) == mask)
1257                 {
1258                         bufHdr->flags |= BM_CHECKPOINT_NEEDED;
1259                         num_to_write++;
1260                 }
1261
1262                 UnlockBufHdr(bufHdr);
1263         }
1264
1265         if (num_to_write == 0)
1266                 return;                                 /* nothing to do */
1267
1268         TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
1269
1270         /*
1271          * Loop over all buffers again, and write the ones (still) marked with
1272          * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
1273          * since we might as well dump soon-to-be-recycled buffers first.
1274          *
1275          * Note that we don't read the buffer alloc count here --- that should be
1276          * left untouched till the next BgBufferSync() call.
1277          */
1278         buf_id = StrategySyncStart(NULL, NULL);
1279         num_to_scan = NBuffers;
1280         num_written = 0;
1281         while (num_to_scan-- > 0)
1282         {
1283                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1284
1285                 /*
1286                  * We don't need to acquire the lock here, because we're only looking
1287                  * at a single bit. It's possible that someone else writes the buffer
1288                  * and clears the flag right after we check, but that doesn't matter
1289                  * since SyncOneBuffer will then do nothing.  However, there is a
1290                  * further race condition: it's conceivable that between the time we
1291                  * examine the bit here and the time SyncOneBuffer acquires lock,
1292                  * someone else not only wrote the buffer but replaced it with another
1293                  * page and dirtied it.  In that improbable case, SyncOneBuffer will
1294                  * write the buffer though we didn't need to.  It doesn't seem worth
1295                  * guarding against this, though.
1296                  */
1297                 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
1298                 {
1299                         if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
1300                         {
1301                                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1302                                 BgWriterStats.m_buf_written_checkpoints++;
1303                                 num_written++;
1304
1305                                 /*
1306                                  * We know there are at most num_to_write buffers with
1307                                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
1308                                  * num_written reaches num_to_write.
1309                                  *
1310                                  * Note that num_written doesn't include buffers written by
1311                                  * other backends, or by the bgwriter cleaning scan. That
1312                                  * means that the estimate of how much progress we've made is
1313                                  * conservative, and also that this test will often fail to
1314                                  * trigger.  But it seems worth making anyway.
1315                                  */
1316                                 if (num_written >= num_to_write)
1317                                         break;
1318
1319                                 /*
1320                                  * Sleep to throttle our I/O rate.
1321                                  */
1322                                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
1323                         }
1324                 }
1325
1326                 if (++buf_id >= NBuffers)
1327                         buf_id = 0;
1328         }
1329
1330         /*
1331          * Update checkpoint statistics. As noted above, this doesn't include
1332          * buffers written by other backends or bgwriter scan.
1333          */
1334         CheckpointStats.ckpt_bufs_written += num_written;
1335
1336         TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
1337 }
1338
1339 /*
1340  * BgBufferSync -- Write out some dirty buffers in the pool.
1341  *
1342  * This is called periodically by the background writer process.
1343  *
1344  * Returns true if it's appropriate for the bgwriter process to go into
1345  * low-power hibernation mode.  (This happens if the strategy clock sweep
1346  * has been "lapped" and no buffer allocations have occurred recently,
1347  * or if the bgwriter has been effectively disabled by setting
1348  * bgwriter_lru_maxpages to 0.)
1349  */
1350 bool
1351 BgBufferSync(void)
1352 {
1353         /* info obtained from freelist.c */
1354         int                     strategy_buf_id;
1355         uint32          strategy_passes;
1356         uint32          recent_alloc;
1357
1358         /*
1359          * Information saved between calls so we can determine the strategy
1360          * point's advance rate and avoid scanning already-cleaned buffers.
1361          */
1362         static bool saved_info_valid = false;
1363         static int      prev_strategy_buf_id;
1364         static uint32 prev_strategy_passes;
1365         static int      next_to_clean;
1366         static uint32 next_passes;
1367
1368         /* Moving averages of allocation rate and clean-buffer density */
1369         static float smoothed_alloc = 0;
1370         static float smoothed_density = 10.0;
1371
1372         /* Potentially these could be tunables, but for now, not */
1373         float           smoothing_samples = 16;
1374         float           scan_whole_pool_milliseconds = 120000.0;
1375
1376         /* Used to compute how far we scan ahead */
1377         long            strategy_delta;
1378         int                     bufs_to_lap;
1379         int                     bufs_ahead;
1380         float           scans_per_alloc;
1381         int                     reusable_buffers_est;
1382         int                     upcoming_alloc_est;
1383         int                     min_scan_buffers;
1384
1385         /* Variables for the scanning loop proper */
1386         int                     num_to_scan;
1387         int                     num_written;
1388         int                     reusable_buffers;
1389
1390         /* Variables for final smoothed_density update */
1391         long            new_strategy_delta;
1392         uint32          new_recent_alloc;
1393
1394         /*
1395          * Find out where the freelist clock sweep currently is, and how many
1396          * buffer allocations have happened since our last call.
1397          */
1398         strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
1399
1400         /* Report buffer alloc counts to pgstat */
1401         BgWriterStats.m_buf_alloc += recent_alloc;
1402
1403         /*
1404          * If we're not running the LRU scan, just stop after doing the stats
1405          * stuff.  We mark the saved state invalid so that we can recover sanely
1406          * if LRU scan is turned back on later.
1407          */
1408         if (bgwriter_lru_maxpages <= 0)
1409         {
1410                 saved_info_valid = false;
1411                 return true;
1412         }
1413
1414         /*
1415          * Compute strategy_delta = how many buffers have been scanned by the
1416          * clock sweep since last time.  If first time through, assume none. Then
1417          * see if we are still ahead of the clock sweep, and if so, how many
1418          * buffers we could scan before we'd catch up with it and "lap" it. Note:
1419          * weird-looking coding of xxx_passes comparisons are to avoid bogus
1420          * behavior when the passes counts wrap around.
1421          */
1422         if (saved_info_valid)
1423         {
1424                 int32           passes_delta = strategy_passes - prev_strategy_passes;
1425
1426                 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
1427                 strategy_delta += (long) passes_delta *NBuffers;
1428
1429                 Assert(strategy_delta >= 0);
1430
1431                 if ((int32) (next_passes - strategy_passes) > 0)
1432                 {
1433                         /* we're one pass ahead of the strategy point */
1434                         bufs_to_lap = strategy_buf_id - next_to_clean;
1435 #ifdef BGW_DEBUG
1436                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1437                                  next_passes, next_to_clean,
1438                                  strategy_passes, strategy_buf_id,
1439                                  strategy_delta, bufs_to_lap);
1440 #endif
1441                 }
1442                 else if (next_passes == strategy_passes &&
1443                                  next_to_clean >= strategy_buf_id)
1444                 {
1445                         /* on same pass, but ahead or at least not behind */
1446                         bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
1447 #ifdef BGW_DEBUG
1448                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1449                                  next_passes, next_to_clean,
1450                                  strategy_passes, strategy_buf_id,
1451                                  strategy_delta, bufs_to_lap);
1452 #endif
1453                 }
1454                 else
1455                 {
1456                         /*
1457                          * We're behind, so skip forward to the strategy point and start
1458                          * cleaning from there.
1459                          */
1460 #ifdef BGW_DEBUG
1461                         elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
1462                                  next_passes, next_to_clean,
1463                                  strategy_passes, strategy_buf_id,
1464                                  strategy_delta);
1465 #endif
1466                         next_to_clean = strategy_buf_id;
1467                         next_passes = strategy_passes;
1468                         bufs_to_lap = NBuffers;
1469                 }
1470         }
1471         else
1472         {
1473                 /*
1474                  * Initializing at startup or after LRU scanning had been off. Always
1475                  * start at the strategy point.
1476                  */
1477 #ifdef BGW_DEBUG
1478                 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
1479                          strategy_passes, strategy_buf_id);
1480 #endif
1481                 strategy_delta = 0;
1482                 next_to_clean = strategy_buf_id;
1483                 next_passes = strategy_passes;
1484                 bufs_to_lap = NBuffers;
1485         }
1486
1487         /* Update saved info for next time */
1488         prev_strategy_buf_id = strategy_buf_id;
1489         prev_strategy_passes = strategy_passes;
1490         saved_info_valid = true;
1491
1492         /*
1493          * Compute how many buffers had to be scanned for each new allocation, ie,
1494          * 1/density of reusable buffers, and track a moving average of that.
1495          *
1496          * If the strategy point didn't move, we don't update the density estimate
1497          */
1498         if (strategy_delta > 0 && recent_alloc > 0)
1499         {
1500                 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
1501                 smoothed_density += (scans_per_alloc - smoothed_density) /
1502                         smoothing_samples;
1503         }
1504
1505         /*
1506          * Estimate how many reusable buffers there are between the current
1507          * strategy point and where we've scanned ahead to, based on the smoothed
1508          * density estimate.
1509          */
1510         bufs_ahead = NBuffers - bufs_to_lap;
1511         reusable_buffers_est = (float) bufs_ahead / smoothed_density;
1512
1513         /*
1514          * Track a moving average of recent buffer allocations.  Here, rather than
1515          * a true average we want a fast-attack, slow-decline behavior: we
1516          * immediately follow any increase.
1517          */
1518         if (smoothed_alloc <= (float) recent_alloc)
1519                 smoothed_alloc = recent_alloc;
1520         else
1521                 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
1522                         smoothing_samples;
1523
1524         /* Scale the estimate by a GUC to allow more aggressive tuning. */
1525         upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
1526
1527         /*
1528          * If recent_alloc remains at zero for many cycles, smoothed_alloc will
1529          * eventually underflow to zero, and the underflows produce annoying
1530          * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
1531          * zero, there's no point in tracking smaller and smaller values of
1532          * smoothed_alloc, so just reset it to exactly zero to avoid this
1533          * syndrome.  It will pop back up as soon as recent_alloc increases.
1534          */
1535         if (upcoming_alloc_est == 0)
1536                 smoothed_alloc = 0;
1537
1538         /*
1539          * Even in cases where there's been little or no buffer allocation
1540          * activity, we want to make a small amount of progress through the buffer
1541          * cache so that as many reusable buffers as possible are clean after an
1542          * idle period.
1543          *
1544          * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
1545          * the BGW will be called during the scan_whole_pool time; slice the
1546          * buffer pool into that many sections.
1547          */
1548         min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
1549
1550         if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
1551         {
1552 #ifdef BGW_DEBUG
1553                 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
1554                          upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
1555 #endif
1556                 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
1557         }
1558
1559         /*
1560          * Now write out dirty reusable buffers, working forward from the
1561          * next_to_clean point, until we have lapped the strategy scan, or cleaned
1562          * enough buffers to match our estimate of the next cycle's allocation
1563          * requirements, or hit the bgwriter_lru_maxpages limit.
1564          */
1565
1566         /* Make sure we can handle the pin inside SyncOneBuffer */
1567         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1568
1569         num_to_scan = bufs_to_lap;
1570         num_written = 0;
1571         reusable_buffers = reusable_buffers_est;
1572
1573         /* Execute the LRU scan */
1574         while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
1575         {
1576                 int                     buffer_state = SyncOneBuffer(next_to_clean, true);
1577
1578                 if (++next_to_clean >= NBuffers)
1579                 {
1580                         next_to_clean = 0;
1581                         next_passes++;
1582                 }
1583                 num_to_scan--;
1584
1585                 if (buffer_state & BUF_WRITTEN)
1586                 {
1587                         reusable_buffers++;
1588                         if (++num_written >= bgwriter_lru_maxpages)
1589                         {
1590                                 BgWriterStats.m_maxwritten_clean++;
1591                                 break;
1592                         }
1593                 }
1594                 else if (buffer_state & BUF_REUSABLE)
1595                         reusable_buffers++;
1596         }
1597
1598         BgWriterStats.m_buf_written_clean += num_written;
1599
1600 #ifdef BGW_DEBUG
1601         elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
1602                  recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
1603                  smoothed_density, reusable_buffers_est, upcoming_alloc_est,
1604                  bufs_to_lap - num_to_scan,
1605                  num_written,
1606                  reusable_buffers - reusable_buffers_est);
1607 #endif
1608
1609         /*
1610          * Consider the above scan as being like a new allocation scan.
1611          * Characterize its density and update the smoothed one based on it. This
1612          * effectively halves the moving average period in cases where both the
1613          * strategy and the background writer are doing some useful scanning,
1614          * which is helpful because a long memory isn't as desirable on the
1615          * density estimates.
1616          */
1617         new_strategy_delta = bufs_to_lap - num_to_scan;
1618         new_recent_alloc = reusable_buffers - reusable_buffers_est;
1619         if (new_strategy_delta > 0 && new_recent_alloc > 0)
1620         {
1621                 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
1622                 smoothed_density += (scans_per_alloc - smoothed_density) /
1623                         smoothing_samples;
1624
1625 #ifdef BGW_DEBUG
1626                 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
1627                          new_recent_alloc, new_strategy_delta,
1628                          scans_per_alloc, smoothed_density);
1629 #endif
1630         }
1631
1632         /* Return true if OK to hibernate */
1633         return (bufs_to_lap == 0 && recent_alloc == 0);
1634 }
1635
1636 /*
1637  * SyncOneBuffer -- process a single buffer during syncing.
1638  *
1639  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
1640  * buffers marked recently used, as these are not replacement candidates.
1641  *
1642  * Returns a bitmask containing the following flag bits:
1643  *      BUF_WRITTEN: we wrote the buffer.
1644  *      BUF_REUSABLE: buffer is available for replacement, ie, it has
1645  *              pin count 0 and usage count 0.
1646  *
1647  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
1648  * after locking it, but we don't care all that much.)
1649  *
1650  * Note: caller must have done ResourceOwnerEnlargeBuffers.
1651  */
1652 static int
1653 SyncOneBuffer(int buf_id, bool skip_recently_used)
1654 {
1655         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1656         int                     result = 0;
1657
1658         /*
1659          * Check whether buffer needs writing.
1660          *
1661          * We can make this check without taking the buffer content lock so long
1662          * as we mark pages dirty in access methods *before* logging changes with
1663          * XLogInsert(): if someone marks the buffer dirty just after our check we
1664          * don't worry because our checkpoint.redo points before log record for
1665          * upcoming changes and so we are not required to write such dirty buffer.
1666          */
1667         LockBufHdr(bufHdr);
1668
1669         if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
1670                 result |= BUF_REUSABLE;
1671         else if (skip_recently_used)
1672         {
1673                 /* Caller told us not to write recently-used buffers */
1674                 UnlockBufHdr(bufHdr);
1675                 return result;
1676         }
1677
1678         if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
1679         {
1680                 /* It's clean, so nothing to do */
1681                 UnlockBufHdr(bufHdr);
1682                 return result;
1683         }
1684
1685         /*
1686          * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
1687          * buffer is clean by the time we've locked it.)
1688          */
1689         PinBuffer_Locked(bufHdr);
1690         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
1691
1692         FlushBuffer(bufHdr, NULL);
1693
1694         LWLockRelease(bufHdr->content_lock);
1695         UnpinBuffer(bufHdr, true);
1696
1697         return result | BUF_WRITTEN;
1698 }
1699
1700
1701 /*
1702  *              AtEOXact_Buffers - clean up at end of transaction.
1703  *
1704  *              As of PostgreSQL 8.0, buffer pins should get released by the
1705  *              ResourceOwner mechanism.  This routine is just a debugging
1706  *              cross-check that no pins remain.
1707  */
1708 void
1709 AtEOXact_Buffers(bool isCommit)
1710 {
1711 #ifdef USE_ASSERT_CHECKING
1712         if (assert_enabled)
1713         {
1714                 int                     RefCountErrors = 0;
1715                 Buffer          b;
1716
1717                 for (b = 1; b <= NBuffers; b++)
1718                 {
1719                         if (PrivateRefCount[b - 1] != 0)
1720                         {
1721                                 PrintBufferLeakWarning(b);
1722                                 RefCountErrors++;
1723                         }
1724                 }
1725                 Assert(RefCountErrors == 0);
1726         }
1727 #endif
1728
1729         AtEOXact_LocalBuffers(isCommit);
1730 }
1731
1732 /*
1733  * InitBufferPoolBackend --- second-stage initialization of a new backend
1734  *
1735  * This is called after we have acquired a PGPROC and so can safely get
1736  * LWLocks.  We don't currently need to do anything at this stage ...
1737  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
1738  * access, and thereby has to be called at the corresponding phase of
1739  * backend shutdown.
1740  */
1741 void
1742 InitBufferPoolBackend(void)
1743 {
1744         on_shmem_exit(AtProcExit_Buffers, 0);
1745 }
1746
1747 /*
1748  * During backend exit, ensure that we released all shared-buffer locks and
1749  * assert that we have no remaining pins.
1750  */
1751 static void
1752 AtProcExit_Buffers(int code, Datum arg)
1753 {
1754         AbortBufferIO();
1755         UnlockBuffers();
1756
1757 #ifdef USE_ASSERT_CHECKING
1758         if (assert_enabled)
1759         {
1760                 int                     RefCountErrors = 0;
1761                 Buffer          b;
1762
1763                 for (b = 1; b <= NBuffers; b++)
1764                 {
1765                         if (PrivateRefCount[b - 1] != 0)
1766                         {
1767                                 PrintBufferLeakWarning(b);
1768                                 RefCountErrors++;
1769                         }
1770                 }
1771                 Assert(RefCountErrors == 0);
1772         }
1773 #endif
1774
1775         /* localbuf.c needs a chance too */
1776         AtProcExit_LocalBuffers();
1777 }
1778
1779 /*
1780  * Helper routine to issue warnings when a buffer is unexpectedly pinned
1781  */
1782 void
1783 PrintBufferLeakWarning(Buffer buffer)
1784 {
1785         volatile BufferDesc *buf;
1786         int32           loccount;
1787         char       *path;
1788         BackendId       backend;
1789
1790         Assert(BufferIsValid(buffer));
1791         if (BufferIsLocal(buffer))
1792         {
1793                 buf = &LocalBufferDescriptors[-buffer - 1];
1794                 loccount = LocalRefCount[-buffer - 1];
1795                 backend = MyBackendId;
1796         }
1797         else
1798         {
1799                 buf = &BufferDescriptors[buffer - 1];
1800                 loccount = PrivateRefCount[buffer - 1];
1801                 backend = InvalidBackendId;
1802         }
1803
1804         /* theoretically we should lock the bufhdr here */
1805         path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
1806         elog(WARNING,
1807                  "buffer refcount leak: [%03d] "
1808                  "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
1809                  buffer, path,
1810                  buf->tag.blockNum, buf->flags,
1811                  buf->refcount, loccount);
1812         pfree(path);
1813 }
1814
1815 /*
1816  * CheckPointBuffers
1817  *
1818  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
1819  *
1820  * Note: temporary relations do not participate in checkpoints, so they don't
1821  * need to be flushed.
1822  */
1823 void
1824 CheckPointBuffers(int flags)
1825 {
1826         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
1827         CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
1828         BufferSync(flags);
1829         CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
1830         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
1831         smgrsync();
1832         CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
1833         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
1834 }
1835
1836
1837 /*
1838  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
1839  */
1840 void
1841 BufmgrCommit(void)
1842 {
1843         /* Nothing to do in bufmgr anymore... */
1844 }
1845
1846 /*
1847  * BufferGetBlockNumber
1848  *              Returns the block number associated with a buffer.
1849  *
1850  * Note:
1851  *              Assumes that the buffer is valid and pinned, else the
1852  *              value may be obsolete immediately...
1853  */
1854 BlockNumber
1855 BufferGetBlockNumber(Buffer buffer)
1856 {
1857         volatile BufferDesc *bufHdr;
1858
1859         Assert(BufferIsPinned(buffer));
1860
1861         if (BufferIsLocal(buffer))
1862                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1863         else
1864                 bufHdr = &BufferDescriptors[buffer - 1];
1865
1866         /* pinned, so OK to read tag without spinlock */
1867         return bufHdr->tag.blockNum;
1868 }
1869
1870 /*
1871  * BufferGetTag
1872  *              Returns the relfilenode, fork number and block number associated with
1873  *              a buffer.
1874  */
1875 void
1876 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
1877                          BlockNumber *blknum)
1878 {
1879         volatile BufferDesc *bufHdr;
1880
1881         /* Do the same checks as BufferGetBlockNumber. */
1882         Assert(BufferIsPinned(buffer));
1883
1884         if (BufferIsLocal(buffer))
1885                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1886         else
1887                 bufHdr = &BufferDescriptors[buffer - 1];
1888
1889         /* pinned, so OK to read tag without spinlock */
1890         *rnode = bufHdr->tag.rnode;
1891         *forknum = bufHdr->tag.forkNum;
1892         *blknum = bufHdr->tag.blockNum;
1893 }
1894
1895 /*
1896  * FlushBuffer
1897  *              Physically write out a shared buffer.
1898  *
1899  * NOTE: this actually just passes the buffer contents to the kernel; the
1900  * real write to disk won't happen until the kernel feels like it.  This
1901  * is okay from our point of view since we can redo the changes from WAL.
1902  * However, we will need to force the changes to disk via fsync before
1903  * we can checkpoint WAL.
1904  *
1905  * The caller must hold a pin on the buffer and have share-locked the
1906  * buffer contents.  (Note: a share-lock does not prevent updates of
1907  * hint bits in the buffer, so the page could change while the write
1908  * is in progress, but we assume that that will not invalidate the data
1909  * written.)
1910  *
1911  * If the caller has an smgr reference for the buffer's relation, pass it
1912  * as the second parameter.  If not, pass NULL.
1913  */
1914 static void
1915 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1916 {
1917         XLogRecPtr      recptr;
1918         ErrorContextCallback errcallback;
1919         instr_time      io_start,
1920                                 io_time;
1921         Block           bufBlock;
1922         char       *bufToWrite;
1923
1924         /*
1925          * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
1926          * false, then someone else flushed the buffer before we could, so we need
1927          * not do anything.
1928          */
1929         if (!StartBufferIO(buf, false))
1930                 return;
1931
1932         /* Setup error traceback support for ereport() */
1933         errcallback.callback = shared_buffer_write_error_callback;
1934         errcallback.arg = (void *) buf;
1935         errcallback.previous = error_context_stack;
1936         error_context_stack = &errcallback;
1937
1938         /* Find smgr relation for buffer */
1939         if (reln == NULL)
1940                 reln = smgropen(buf->tag.rnode, InvalidBackendId);
1941
1942         TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
1943                                                                                 buf->tag.blockNum,
1944                                                                                 reln->smgr_rnode.node.spcNode,
1945                                                                                 reln->smgr_rnode.node.dbNode,
1946                                                                                 reln->smgr_rnode.node.relNode);
1947
1948         LockBufHdr(buf);
1949
1950         /*
1951          * Run PageGetLSN while holding header lock, since we don't have the
1952          * buffer locked exclusively in all cases.
1953          */
1954         recptr = BufferGetLSN(buf);
1955
1956         /* To check if block content changes while flushing. - vadim 01/17/97 */
1957         buf->flags &= ~BM_JUST_DIRTIED;
1958         UnlockBufHdr(buf);
1959
1960         /*
1961          * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
1962          * rule that log updates must hit disk before any of the data-file changes
1963          * they describe do.
1964          *
1965          * However, this rule does not apply to unlogged relations, which will be
1966          * lost after a crash anyway.  Most unlogged relation pages do not bear
1967          * LSNs since we never emit WAL records for them, and therefore flushing
1968          * up through the buffer LSN would be useless, but harmless.  However,
1969          * GiST indexes use LSNs internally to track page-splits, and therefore
1970          * unlogged GiST pages bear "fake" LSNs generated by
1971          * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
1972          * LSN counter could advance past the WAL insertion point; and if it did
1973          * happen, attempting to flush WAL through that location would fail, with
1974          * disastrous system-wide consequences.  To make sure that can't happen,
1975          * skip the flush if the buffer isn't permanent.
1976          */
1977         if (buf->flags & BM_PERMANENT)
1978                 XLogFlush(recptr);
1979
1980         /*
1981          * Now it's safe to write buffer to disk. Note that no one else should
1982          * have been able to write it while we were busy with log flushing because
1983          * we have the io_in_progress lock.
1984          */
1985         bufBlock = BufHdrGetBlock(buf);
1986
1987         /*
1988          * Update page checksum if desired.  Since we have only shared lock on the
1989          * buffer, other processes might be updating hint bits in it, so we must
1990          * copy the page to private storage if we do checksumming.
1991          */
1992         bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
1993
1994         if (track_io_timing)
1995                 INSTR_TIME_SET_CURRENT(io_start);
1996
1997         /*
1998          * bufToWrite is either the shared buffer or a copy, as appropriate.
1999          */
2000         smgrwrite(reln,
2001                           buf->tag.forkNum,
2002                           buf->tag.blockNum,
2003                           bufToWrite,
2004                           false);
2005
2006         if (track_io_timing)
2007         {
2008                 INSTR_TIME_SET_CURRENT(io_time);
2009                 INSTR_TIME_SUBTRACT(io_time, io_start);
2010                 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2011                 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2012         }
2013
2014         pgBufferUsage.shared_blks_written++;
2015
2016         /*
2017          * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2018          * end the io_in_progress state.
2019          */
2020         TerminateBufferIO(buf, true, 0);
2021
2022         TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2023                                                                            buf->tag.blockNum,
2024                                                                            reln->smgr_rnode.node.spcNode,
2025                                                                            reln->smgr_rnode.node.dbNode,
2026                                                                            reln->smgr_rnode.node.relNode);
2027
2028         /* Pop the error context stack */
2029         error_context_stack = errcallback.previous;
2030 }
2031
2032 /*
2033  * RelationGetNumberOfBlocks
2034  *              Determines the current number of pages in the relation.
2035  */
2036 BlockNumber
2037 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2038 {
2039         /* Open it at the smgr level if not already done */
2040         RelationOpenSmgr(relation);
2041
2042         return smgrnblocks(relation->rd_smgr, forkNum);
2043 }
2044
2045 /*
2046  * BufferIsPermanent
2047  *              Determines whether a buffer will potentially still be around after
2048  *              a crash.  Caller must hold a buffer pin.
2049  */
2050 bool
2051 BufferIsPermanent(Buffer buffer)
2052 {
2053         volatile BufferDesc *bufHdr;
2054
2055         /* Local buffers are used only for temp relations. */
2056         if (BufferIsLocal(buffer))
2057                 return false;
2058
2059         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2060         Assert(BufferIsValid(buffer));
2061         Assert(BufferIsPinned(buffer));
2062
2063         /*
2064          * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2065          * need not bother with the buffer header spinlock.  Even if someone else
2066          * changes the buffer header flags while we're doing this, we assume that
2067          * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
2068          * old value or the new value, but not random garbage.
2069          */
2070         bufHdr = &BufferDescriptors[buffer - 1];
2071         return (bufHdr->flags & BM_PERMANENT) != 0;
2072 }
2073
2074 /*
2075  * BufferGetLSNAtomic
2076  *              Retrieves the LSN of the buffer atomically using a buffer header lock.
2077  *              This is necessary for some callers who may not have an exclusive lock
2078  *              on the buffer.
2079  */
2080 XLogRecPtr
2081 BufferGetLSNAtomic(Buffer buffer)
2082 {
2083         volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
2084         char       *page = BufferGetPage(buffer);
2085         XLogRecPtr      lsn;
2086
2087         /*
2088          * If we don't need locking for correctness, fastpath out.
2089          */
2090         if (!DataChecksumsEnabled() || BufferIsLocal(buffer))
2091                 return PageGetLSN(page);
2092
2093         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2094         Assert(BufferIsValid(buffer));
2095         Assert(BufferIsPinned(buffer));
2096
2097         LockBufHdr(bufHdr);
2098         lsn = PageGetLSN(page);
2099         UnlockBufHdr(bufHdr);
2100
2101         return lsn;
2102 }
2103
2104 /* ---------------------------------------------------------------------
2105  *              DropRelFileNodeBuffers
2106  *
2107  *              This function removes from the buffer pool all the pages of the
2108  *              specified relation fork that have block numbers >= firstDelBlock.
2109  *              (In particular, with firstDelBlock = 0, all pages are removed.)
2110  *              Dirty pages are simply dropped, without bothering to write them
2111  *              out first.      Therefore, this is NOT rollback-able, and so should be
2112  *              used only with extreme caution!
2113  *
2114  *              Currently, this is called only from smgr.c when the underlying file
2115  *              is about to be deleted or truncated (firstDelBlock is needed for
2116  *              the truncation case).  The data in the affected pages would therefore
2117  *              be deleted momentarily anyway, and there is no point in writing it.
2118  *              It is the responsibility of higher-level code to ensure that the
2119  *              deletion or truncation does not lose any data that could be needed
2120  *              later.  It is also the responsibility of higher-level code to ensure
2121  *              that no other process could be trying to load more pages of the
2122  *              relation into buffers.
2123  *
2124  *              XXX currently it sequentially searches the buffer pool, should be
2125  *              changed to more clever ways of searching.  However, this routine
2126  *              is used only in code paths that aren't very performance-critical,
2127  *              and we shouldn't slow down the hot paths to make it faster ...
2128  * --------------------------------------------------------------------
2129  */
2130 void
2131 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2132                                            BlockNumber firstDelBlock)
2133 {
2134         int                     i;
2135
2136         /* If it's a local relation, it's localbuf.c's problem. */
2137         if (RelFileNodeBackendIsTemp(rnode))
2138         {
2139                 if (rnode.backend == MyBackendId)
2140                         DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2141                 return;
2142         }
2143
2144         for (i = 0; i < NBuffers; i++)
2145         {
2146                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2147
2148                 /*
2149                  * We can make this a tad faster by prechecking the buffer tag before
2150                  * we attempt to lock the buffer; this saves a lot of lock
2151                  * acquisitions in typical cases.  It should be safe because the
2152                  * caller must have AccessExclusiveLock on the relation, or some other
2153                  * reason to be certain that no one is loading new pages of the rel
2154                  * into the buffer pool.  (Otherwise we might well miss such pages
2155                  * entirely.)  Therefore, while the tag might be changing while we
2156                  * look at it, it can't be changing *to* a value we care about, only
2157                  * *away* from such a value.  So false negatives are impossible, and
2158                  * false positives are safe because we'll recheck after getting the
2159                  * buffer lock.
2160                  *
2161                  * We could check forkNum and blockNum as well as the rnode, but the
2162                  * incremental win from doing so seems small.
2163                  */
2164                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2165                         continue;
2166
2167                 LockBufHdr(bufHdr);
2168                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2169                         bufHdr->tag.forkNum == forkNum &&
2170                         bufHdr->tag.blockNum >= firstDelBlock)
2171                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2172                 else
2173                         UnlockBufHdr(bufHdr);
2174         }
2175 }
2176
2177 /* ---------------------------------------------------------------------
2178  *              DropRelFileNodesAllBuffers
2179  *
2180  *              This function removes from the buffer pool all the pages of all
2181  *              forks of the specified relations.  It's equivalent to calling
2182  *              DropRelFileNodeBuffers once per fork per relation with
2183  *              firstDelBlock = 0.
2184  * --------------------------------------------------------------------
2185  */
2186 void
2187 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2188 {
2189         int                     i,
2190                                 n = 0;
2191         RelFileNode *nodes;
2192         bool            use_bsearch;
2193
2194         if (nnodes == 0)
2195                 return;
2196
2197         nodes = palloc(sizeof(RelFileNode) * nnodes);           /* non-local relations */
2198
2199         /* If it's a local relation, it's localbuf.c's problem. */
2200         for (i = 0; i < nnodes; i++)
2201         {
2202                 if (RelFileNodeBackendIsTemp(rnodes[i]))
2203                 {
2204                         if (rnodes[i].backend == MyBackendId)
2205                                 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2206                 }
2207                 else
2208                         nodes[n++] = rnodes[i].node;
2209         }
2210
2211         /*
2212          * If there are no non-local relations, then we're done. Release the
2213          * memory and return.
2214          */
2215         if (n == 0)
2216         {
2217                 pfree(nodes);
2218                 return;
2219         }
2220
2221         /*
2222          * For low number of relations to drop just use a simple walk through, to
2223          * save the bsearch overhead. The threshold to use is rather a guess than
2224          * an exactly determined value, as it depends on many factors (CPU and RAM
2225          * speeds, amount of shared buffers etc.).
2226          */
2227         use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2228
2229         /* sort the list of rnodes if necessary */
2230         if (use_bsearch)
2231                 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2232
2233         for (i = 0; i < NBuffers; i++)
2234         {
2235                 RelFileNode *rnode = NULL;
2236                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2237
2238                 /*
2239                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2240                  * and saves some cycles.
2241                  */
2242
2243                 if (!use_bsearch)
2244                 {
2245                         int                     j;
2246
2247                         for (j = 0; j < n; j++)
2248                         {
2249                                 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
2250                                 {
2251                                         rnode = &nodes[j];
2252                                         break;
2253                                 }
2254                         }
2255                 }
2256                 else
2257                 {
2258                         rnode = bsearch((const void *) &(bufHdr->tag.rnode),
2259                                                         nodes, n, sizeof(RelFileNode),
2260                                                         rnode_comparator);
2261                 }
2262
2263                 /* buffer doesn't belong to any of the given relfilenodes; skip it */
2264                 if (rnode == NULL)
2265                         continue;
2266
2267                 LockBufHdr(bufHdr);
2268                 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
2269                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2270                 else
2271                         UnlockBufHdr(bufHdr);
2272         }
2273
2274         pfree(nodes);
2275 }
2276
2277 /* ---------------------------------------------------------------------
2278  *              DropDatabaseBuffers
2279  *
2280  *              This function removes all the buffers in the buffer cache for a
2281  *              particular database.  Dirty pages are simply dropped, without
2282  *              bothering to write them out first.      This is used when we destroy a
2283  *              database, to avoid trying to flush data to disk when the directory
2284  *              tree no longer exists.  Implementation is pretty similar to
2285  *              DropRelFileNodeBuffers() which is for destroying just one relation.
2286  * --------------------------------------------------------------------
2287  */
2288 void
2289 DropDatabaseBuffers(Oid dbid)
2290 {
2291         int                     i;
2292
2293         /*
2294          * We needn't consider local buffers, since by assumption the target
2295          * database isn't our own.
2296          */
2297
2298         for (i = 0; i < NBuffers; i++)
2299         {
2300                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2301
2302                 /*
2303                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2304                  * and saves some cycles.
2305                  */
2306                 if (bufHdr->tag.rnode.dbNode != dbid)
2307                         continue;
2308
2309                 LockBufHdr(bufHdr);
2310                 if (bufHdr->tag.rnode.dbNode == dbid)
2311                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2312                 else
2313                         UnlockBufHdr(bufHdr);
2314         }
2315 }
2316
2317 /* -----------------------------------------------------------------
2318  *              PrintBufferDescs
2319  *
2320  *              this function prints all the buffer descriptors, for debugging
2321  *              use only.
2322  * -----------------------------------------------------------------
2323  */
2324 #ifdef NOT_USED
2325 void
2326 PrintBufferDescs(void)
2327 {
2328         int                     i;
2329         volatile BufferDesc *buf = BufferDescriptors;
2330
2331         for (i = 0; i < NBuffers; ++i, ++buf)
2332         {
2333                 /* theoretically we should lock the bufhdr here */
2334                 elog(LOG,
2335                          "[%02d] (freeNext=%d, rel=%s, "
2336                          "blockNum=%u, flags=0x%x, refcount=%u %d)",
2337                          i, buf->freeNext,
2338                   relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
2339                          buf->tag.blockNum, buf->flags,
2340                          buf->refcount, PrivateRefCount[i]);
2341         }
2342 }
2343 #endif
2344
2345 #ifdef NOT_USED
2346 void
2347 PrintPinnedBufs(void)
2348 {
2349         int                     i;
2350         volatile BufferDesc *buf = BufferDescriptors;
2351
2352         for (i = 0; i < NBuffers; ++i, ++buf)
2353         {
2354                 if (PrivateRefCount[i] > 0)
2355                 {
2356                         /* theoretically we should lock the bufhdr here */
2357                         elog(LOG,
2358                                  "[%02d] (freeNext=%d, rel=%s, "
2359                                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
2360                                  i, buf->freeNext,
2361                                  relpath(buf->tag.rnode, buf->tag.forkNum),
2362                                  buf->tag.blockNum, buf->flags,
2363                                  buf->refcount, PrivateRefCount[i]);
2364                 }
2365         }
2366 }
2367 #endif
2368
2369 /* ---------------------------------------------------------------------
2370  *              FlushRelationBuffers
2371  *
2372  *              This function writes all dirty pages of a relation out to disk
2373  *              (or more accurately, out to kernel disk buffers), ensuring that the
2374  *              kernel has an up-to-date view of the relation.
2375  *
2376  *              Generally, the caller should be holding AccessExclusiveLock on the
2377  *              target relation to ensure that no other backend is busy dirtying
2378  *              more blocks of the relation; the effects can't be expected to last
2379  *              after the lock is released.
2380  *
2381  *              XXX currently it sequentially searches the buffer pool, should be
2382  *              changed to more clever ways of searching.  This routine is not
2383  *              used in any performance-critical code paths, so it's not worth
2384  *              adding additional overhead to normal paths to make it go faster;
2385  *              but see also DropRelFileNodeBuffers.
2386  * --------------------------------------------------------------------
2387  */
2388 void
2389 FlushRelationBuffers(Relation rel)
2390 {
2391         int                     i;
2392         volatile BufferDesc *bufHdr;
2393
2394         /* Open rel at the smgr level if not already done */
2395         RelationOpenSmgr(rel);
2396
2397         if (RelationUsesLocalBuffers(rel))
2398         {
2399                 for (i = 0; i < NLocBuffer; i++)
2400                 {
2401                         bufHdr = &LocalBufferDescriptors[i];
2402                         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2403                                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2404                         {
2405                                 ErrorContextCallback errcallback;
2406                                 Page            localpage;
2407
2408                                 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
2409
2410                                 /* Setup error traceback support for ereport() */
2411                                 errcallback.callback = local_buffer_write_error_callback;
2412                                 errcallback.arg = (void *) bufHdr;
2413                                 errcallback.previous = error_context_stack;
2414                                 error_context_stack = &errcallback;
2415
2416                                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
2417
2418                                 smgrwrite(rel->rd_smgr,
2419                                                   bufHdr->tag.forkNum,
2420                                                   bufHdr->tag.blockNum,
2421                                                   localpage,
2422                                                   false);
2423
2424                                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
2425
2426                                 /* Pop the error context stack */
2427                                 error_context_stack = errcallback.previous;
2428                         }
2429                 }
2430
2431                 return;
2432         }
2433
2434         /* Make sure we can handle the pin inside the loop */
2435         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2436
2437         for (i = 0; i < NBuffers; i++)
2438         {
2439                 bufHdr = &BufferDescriptors[i];
2440
2441                 /*
2442                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2443                  * and saves some cycles.
2444                  */
2445                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
2446                         continue;
2447
2448                 LockBufHdr(bufHdr);
2449                 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2450                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2451                 {
2452                         PinBuffer_Locked(bufHdr);
2453                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2454                         FlushBuffer(bufHdr, rel->rd_smgr);
2455                         LWLockRelease(bufHdr->content_lock);
2456                         UnpinBuffer(bufHdr, true);
2457                 }
2458                 else
2459                         UnlockBufHdr(bufHdr);
2460         }
2461 }
2462
2463 /* ---------------------------------------------------------------------
2464  *              FlushDatabaseBuffers
2465  *
2466  *              This function writes all dirty pages of a database out to disk
2467  *              (or more accurately, out to kernel disk buffers), ensuring that the
2468  *              kernel has an up-to-date view of the database.
2469  *
2470  *              Generally, the caller should be holding an appropriate lock to ensure
2471  *              no other backend is active in the target database; otherwise more
2472  *              pages could get dirtied.
2473  *
2474  *              Note we don't worry about flushing any pages of temporary relations.
2475  *              It's assumed these wouldn't be interesting.
2476  * --------------------------------------------------------------------
2477  */
2478 void
2479 FlushDatabaseBuffers(Oid dbid)
2480 {
2481         int                     i;
2482         volatile BufferDesc *bufHdr;
2483
2484         /* Make sure we can handle the pin inside the loop */
2485         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2486
2487         for (i = 0; i < NBuffers; i++)
2488         {
2489                 bufHdr = &BufferDescriptors[i];
2490
2491                 /*
2492                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2493                  * and saves some cycles.
2494                  */
2495                 if (bufHdr->tag.rnode.dbNode != dbid)
2496                         continue;
2497
2498                 LockBufHdr(bufHdr);
2499                 if (bufHdr->tag.rnode.dbNode == dbid &&
2500                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2501                 {
2502                         PinBuffer_Locked(bufHdr);
2503                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2504                         FlushBuffer(bufHdr, NULL);
2505                         LWLockRelease(bufHdr->content_lock);
2506                         UnpinBuffer(bufHdr, true);
2507                 }
2508                 else
2509                         UnlockBufHdr(bufHdr);
2510         }
2511 }
2512
2513 /*
2514  * ReleaseBuffer -- release the pin on a buffer
2515  */
2516 void
2517 ReleaseBuffer(Buffer buffer)
2518 {
2519         volatile BufferDesc *bufHdr;
2520
2521         if (!BufferIsValid(buffer))
2522                 elog(ERROR, "bad buffer ID: %d", buffer);
2523
2524         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
2525
2526         if (BufferIsLocal(buffer))
2527         {
2528                 Assert(LocalRefCount[-buffer - 1] > 0);
2529                 LocalRefCount[-buffer - 1]--;
2530                 return;
2531         }
2532
2533         bufHdr = &BufferDescriptors[buffer - 1];
2534
2535         Assert(PrivateRefCount[buffer - 1] > 0);
2536
2537         if (PrivateRefCount[buffer - 1] > 1)
2538                 PrivateRefCount[buffer - 1]--;
2539         else
2540                 UnpinBuffer(bufHdr, false);
2541 }
2542
2543 /*
2544  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
2545  *
2546  * This is just a shorthand for a common combination.
2547  */
2548 void
2549 UnlockReleaseBuffer(Buffer buffer)
2550 {
2551         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2552         ReleaseBuffer(buffer);
2553 }
2554
2555 /*
2556  * IncrBufferRefCount
2557  *              Increment the pin count on a buffer that we have *already* pinned
2558  *              at least once.
2559  *
2560  *              This function cannot be used on a buffer we do not have pinned,
2561  *              because it doesn't change the shared buffer state.
2562  */
2563 void
2564 IncrBufferRefCount(Buffer buffer)
2565 {
2566         Assert(BufferIsPinned(buffer));
2567         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2568         ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
2569         if (BufferIsLocal(buffer))
2570                 LocalRefCount[-buffer - 1]++;
2571         else
2572                 PrivateRefCount[buffer - 1]++;
2573 }
2574
2575 /*
2576  * MarkBufferDirtyHint
2577  *
2578  *      Mark a buffer dirty for non-critical changes.
2579  *
2580  * This is essentially the same as MarkBufferDirty, except:
2581  *
2582  * 1. The caller does not write WAL; so if checksums are enabled, we may need
2583  *        to write an XLOG_HINT WAL record to protect against torn pages.
2584  * 2. The caller might have only share-lock instead of exclusive-lock on the
2585  *        buffer's content lock.
2586  * 3. This function does not guarantee that the buffer is always marked dirty
2587  *        (due to a race condition), so it cannot be used for important changes.
2588  */
2589 void
2590 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
2591 {
2592         volatile BufferDesc *bufHdr;
2593         Page            page = BufferGetPage(buffer);
2594
2595         if (!BufferIsValid(buffer))
2596                 elog(ERROR, "bad buffer ID: %d", buffer);
2597
2598         if (BufferIsLocal(buffer))
2599         {
2600                 MarkLocalBufferDirty(buffer);
2601                 return;
2602         }
2603
2604         bufHdr = &BufferDescriptors[buffer - 1];
2605
2606         Assert(PrivateRefCount[buffer - 1] > 0);
2607         /* here, either share or exclusive lock is OK */
2608         Assert(LWLockHeldByMe(bufHdr->content_lock));
2609
2610         /*
2611          * This routine might get called many times on the same page, if we are
2612          * making the first scan after commit of an xact that added/deleted many
2613          * tuples. So, be as quick as we can if the buffer is already dirty.  We
2614          * do this by not acquiring spinlock if it looks like the status bits are
2615          * already set.  Since we make this test unlocked, there's a chance we
2616          * might fail to notice that the flags have just been cleared, and failed
2617          * to reset them, due to memory-ordering issues.  But since this function
2618          * is only intended to be used in cases where failing to write out the
2619          * data would be harmless anyway, it doesn't really matter.
2620          */
2621         if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
2622                 (BM_DIRTY | BM_JUST_DIRTIED))
2623         {
2624                 XLogRecPtr      lsn = InvalidXLogRecPtr;
2625                 bool            dirtied = false;
2626                 bool            delayChkpt = false;
2627
2628                 /*
2629                  * If we need to protect hint bit updates from torn writes, WAL-log a
2630                  * full page image of the page. This full page image is only necessary
2631                  * if the hint bit update is the first change to the page since the
2632                  * last checkpoint.
2633                  *
2634                  * We don't check full_page_writes here because that logic is included
2635                  * when we call XLogInsert() since the value changes dynamically.
2636                  */
2637                 if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
2638                 {
2639                         /*
2640                          * If we're in recovery we cannot dirty a page because of a hint.
2641                          * We can set the hint, just not dirty the page as a result so the
2642                          * hint is lost when we evict the page or shutdown.
2643                          *
2644                          * See src/backend/storage/page/README for longer discussion.
2645                          */
2646                         if (RecoveryInProgress())
2647                                 return;
2648
2649                         /*
2650                          * If the block is already dirty because we either made a change
2651                          * or set a hint already, then we don't need to write a full page
2652                          * image.  Note that aggressive cleaning of blocks dirtied by hint
2653                          * bit setting would increase the call rate. Bulk setting of hint
2654                          * bits would reduce the call rate...
2655                          *
2656                          * We must issue the WAL record before we mark the buffer dirty.
2657                          * Otherwise we might write the page before we write the WAL. That
2658                          * causes a race condition, since a checkpoint might occur between
2659                          * writing the WAL record and marking the buffer dirty. We solve
2660                          * that with a kluge, but one that is already in use during
2661                          * transaction commit to prevent race conditions. Basically, we
2662                          * simply prevent the checkpoint WAL record from being written
2663                          * until we have marked the buffer dirty. We don't start the
2664                          * checkpoint flush until we have marked dirty, so our checkpoint
2665                          * must flush the change to disk successfully or the checkpoint
2666                          * never gets written, so crash recovery will fix.
2667                          *
2668                          * It's possible we may enter here without an xid, so it is
2669                          * essential that CreateCheckpoint waits for virtual transactions
2670                          * rather than full transactionids.
2671                          */
2672                         MyPgXact->delayChkpt = delayChkpt = true;
2673                         lsn = XLogSaveBufferForHint(buffer, buffer_std);
2674                 }
2675
2676                 LockBufHdr(bufHdr);
2677                 Assert(bufHdr->refcount > 0);
2678                 if (!(bufHdr->flags & BM_DIRTY))
2679                 {
2680                         dirtied = true;         /* Means "will be dirtied by this action" */
2681
2682                         /*
2683                          * Set the page LSN if we wrote a backup block. We aren't supposed
2684                          * to set this when only holding a share lock but as long as we
2685                          * serialise it somehow we're OK. We choose to set LSN while
2686                          * holding the buffer header lock, which causes any reader of an
2687                          * LSN who holds only a share lock to also obtain a buffer header
2688                          * lock before using PageGetLSN(), which is enforced in
2689                          * BufferGetLSNAtomic().
2690                          *
2691                          * If checksums are enabled, you might think we should reset the
2692                          * checksum here. That will happen when the page is written
2693                          * sometime later in this checkpoint cycle.
2694                          */
2695                         if (!XLogRecPtrIsInvalid(lsn))
2696                                 PageSetLSN(page, lsn);
2697                 }
2698                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
2699                 UnlockBufHdr(bufHdr);
2700
2701                 if (delayChkpt)
2702                         MyPgXact->delayChkpt = false;
2703
2704                 if (dirtied)
2705                 {
2706                         VacuumPageDirty++;
2707                         if (VacuumCostActive)
2708                                 VacuumCostBalance += VacuumCostPageDirty;
2709                 }
2710         }
2711 }
2712
2713 /*
2714  * Release buffer content locks for shared buffers.
2715  *
2716  * Used to clean up after errors.
2717  *
2718  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
2719  * of releasing buffer content locks per se; the only thing we need to deal
2720  * with here is clearing any PIN_COUNT request that was in progress.
2721  */
2722 void
2723 UnlockBuffers(void)
2724 {
2725         volatile BufferDesc *buf = PinCountWaitBuf;
2726
2727         if (buf)
2728         {
2729                 LockBufHdr(buf);
2730
2731                 /*
2732                  * Don't complain if flag bit not set; it could have been reset but we
2733                  * got a cancel/die interrupt before getting the signal.
2734                  */
2735                 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
2736                         buf->wait_backend_pid == MyProcPid)
2737                         buf->flags &= ~BM_PIN_COUNT_WAITER;
2738
2739                 UnlockBufHdr(buf);
2740
2741                 PinCountWaitBuf = NULL;
2742         }
2743 }
2744
2745 /*
2746  * Acquire or release the content_lock for the buffer.
2747  */
2748 void
2749 LockBuffer(Buffer buffer, int mode)
2750 {
2751         volatile BufferDesc *buf;
2752
2753         Assert(BufferIsValid(buffer));
2754         if (BufferIsLocal(buffer))
2755                 return;                                 /* local buffers need no lock */
2756
2757         buf = &(BufferDescriptors[buffer - 1]);
2758
2759         if (mode == BUFFER_LOCK_UNLOCK)
2760                 LWLockRelease(buf->content_lock);
2761         else if (mode == BUFFER_LOCK_SHARE)
2762                 LWLockAcquire(buf->content_lock, LW_SHARED);
2763         else if (mode == BUFFER_LOCK_EXCLUSIVE)
2764                 LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
2765         else
2766                 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
2767 }
2768
2769 /*
2770  * Acquire the content_lock for the buffer, but only if we don't have to wait.
2771  *
2772  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
2773  */
2774 bool
2775 ConditionalLockBuffer(Buffer buffer)
2776 {
2777         volatile BufferDesc *buf;
2778
2779         Assert(BufferIsValid(buffer));
2780         if (BufferIsLocal(buffer))
2781                 return true;                    /* act as though we got it */
2782
2783         buf = &(BufferDescriptors[buffer - 1]);
2784
2785         return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
2786 }
2787
2788 /*
2789  * LockBufferForCleanup - lock a buffer in preparation for deleting items
2790  *
2791  * Items may be deleted from a disk page only when the caller (a) holds an
2792  * exclusive lock on the buffer and (b) has observed that no other backend
2793  * holds a pin on the buffer.  If there is a pin, then the other backend
2794  * might have a pointer into the buffer (for example, a heapscan reference
2795  * to an item --- see README for more details).  It's OK if a pin is added
2796  * after the cleanup starts, however; the newly-arrived backend will be
2797  * unable to look at the page until we release the exclusive lock.
2798  *
2799  * To implement this protocol, a would-be deleter must pin the buffer and
2800  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
2801  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
2802  * it has successfully observed pin count = 1.
2803  */
2804 void
2805 LockBufferForCleanup(Buffer buffer)
2806 {
2807         volatile BufferDesc *bufHdr;
2808
2809         Assert(BufferIsValid(buffer));
2810         Assert(PinCountWaitBuf == NULL);
2811
2812         if (BufferIsLocal(buffer))
2813         {
2814                 /* There should be exactly one pin */
2815                 if (LocalRefCount[-buffer - 1] != 1)
2816                         elog(ERROR, "incorrect local pin count: %d",
2817                                  LocalRefCount[-buffer - 1]);
2818                 /* Nobody else to wait for */
2819                 return;
2820         }
2821
2822         /* There should be exactly one local pin */
2823         if (PrivateRefCount[buffer - 1] != 1)
2824                 elog(ERROR, "incorrect local pin count: %d",
2825                          PrivateRefCount[buffer - 1]);
2826
2827         bufHdr = &BufferDescriptors[buffer - 1];
2828
2829         for (;;)
2830         {
2831                 /* Try to acquire lock */
2832                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
2833                 LockBufHdr(bufHdr);
2834                 Assert(bufHdr->refcount > 0);
2835                 if (bufHdr->refcount == 1)
2836                 {
2837                         /* Successfully acquired exclusive lock with pincount 1 */
2838                         UnlockBufHdr(bufHdr);
2839                         return;
2840                 }
2841                 /* Failed, so mark myself as waiting for pincount 1 */
2842                 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
2843                 {
2844                         UnlockBufHdr(bufHdr);
2845                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2846                         elog(ERROR, "multiple backends attempting to wait for pincount 1");
2847                 }
2848                 bufHdr->wait_backend_pid = MyProcPid;
2849                 bufHdr->flags |= BM_PIN_COUNT_WAITER;
2850                 PinCountWaitBuf = bufHdr;
2851                 UnlockBufHdr(bufHdr);
2852                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2853
2854                 /* Wait to be signaled by UnpinBuffer() */
2855                 if (InHotStandby)
2856                 {
2857                         /* Publish the bufid that Startup process waits on */
2858                         SetStartupBufferPinWaitBufId(buffer - 1);
2859                         /* Set alarm and then wait to be signaled by UnpinBuffer() */
2860                         ResolveRecoveryConflictWithBufferPin();
2861                         /* Reset the published bufid */
2862                         SetStartupBufferPinWaitBufId(-1);
2863                 }
2864                 else
2865                         ProcWaitForSignal();
2866
2867                 PinCountWaitBuf = NULL;
2868                 /* Loop back and try again */
2869         }
2870 }
2871
2872 /*
2873  * Check called from RecoveryConflictInterrupt handler when Startup
2874  * process requests cancellation of all pin holders that are blocking it.
2875  */
2876 bool
2877 HoldingBufferPinThatDelaysRecovery(void)
2878 {
2879         int                     bufid = GetStartupBufferPinWaitBufId();
2880
2881         /*
2882          * If we get woken slowly then it's possible that the Startup process was
2883          * already woken by other backends before we got here. Also possible that
2884          * we get here by multiple interrupts or interrupts at inappropriate
2885          * times, so make sure we do nothing if the bufid is not set.
2886          */
2887         if (bufid < 0)
2888                 return false;
2889
2890         if (PrivateRefCount[bufid] > 0)
2891                 return true;
2892
2893         return false;
2894 }
2895
2896 /*
2897  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
2898  *
2899  * We won't loop, but just check once to see if the pin count is OK.  If
2900  * not, return FALSE with no lock held.
2901  */
2902 bool
2903 ConditionalLockBufferForCleanup(Buffer buffer)
2904 {
2905         volatile BufferDesc *bufHdr;
2906
2907         Assert(BufferIsValid(buffer));
2908
2909         if (BufferIsLocal(buffer))
2910         {
2911                 /* There should be exactly one pin */
2912                 Assert(LocalRefCount[-buffer - 1] > 0);
2913                 if (LocalRefCount[-buffer - 1] != 1)
2914                         return false;
2915                 /* Nobody else to wait for */
2916                 return true;
2917         }
2918
2919         /* There should be exactly one local pin */
2920         Assert(PrivateRefCount[buffer - 1] > 0);
2921         if (PrivateRefCount[buffer - 1] != 1)
2922                 return false;
2923
2924         /* Try to acquire lock */
2925         if (!ConditionalLockBuffer(buffer))
2926                 return false;
2927
2928         bufHdr = &BufferDescriptors[buffer - 1];
2929         LockBufHdr(bufHdr);
2930         Assert(bufHdr->refcount > 0);
2931         if (bufHdr->refcount == 1)
2932         {
2933                 /* Successfully acquired exclusive lock with pincount 1 */
2934                 UnlockBufHdr(bufHdr);
2935                 return true;
2936         }
2937
2938         /* Failed, so release the lock */
2939         UnlockBufHdr(bufHdr);
2940         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2941         return false;
2942 }
2943
2944
2945 /*
2946  *      Functions for buffer I/O handling
2947  *
2948  *      Note: We assume that nested buffer I/O never occurs.
2949  *      i.e at most one io_in_progress lock is held per proc.
2950  *
2951  *      Also note that these are used only for shared buffers, not local ones.
2952  */
2953
2954 /*
2955  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
2956  */
2957 static void
2958 WaitIO(volatile BufferDesc *buf)
2959 {
2960         /*
2961          * Changed to wait until there's no IO - Inoue 01/13/2000
2962          *
2963          * Note this is *necessary* because an error abort in the process doing
2964          * I/O could release the io_in_progress_lock prematurely. See
2965          * AbortBufferIO.
2966          */
2967         for (;;)
2968         {
2969                 BufFlags        sv_flags;
2970
2971                 /*
2972                  * It may not be necessary to acquire the spinlock to check the flag
2973                  * here, but since this test is essential for correctness, we'd better
2974                  * play it safe.
2975                  */
2976                 LockBufHdr(buf);
2977                 sv_flags = buf->flags;
2978                 UnlockBufHdr(buf);
2979                 if (!(sv_flags & BM_IO_IN_PROGRESS))
2980                         break;
2981                 LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
2982                 LWLockRelease(buf->io_in_progress_lock);
2983         }
2984 }
2985
2986 /*
2987  * StartBufferIO: begin I/O on this buffer
2988  *      (Assumptions)
2989  *      My process is executing no IO
2990  *      The buffer is Pinned
2991  *
2992  * In some scenarios there are race conditions in which multiple backends
2993  * could attempt the same I/O operation concurrently.  If someone else
2994  * has already started I/O on this buffer then we will block on the
2995  * io_in_progress lock until he's done.
2996  *
2997  * Input operations are only attempted on buffers that are not BM_VALID,
2998  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
2999  * so we can always tell if the work is already done.
3000  *
3001  * Returns TRUE if we successfully marked the buffer as I/O busy,
3002  * FALSE if someone else already did the work.
3003  */
3004 static bool
3005 StartBufferIO(volatile BufferDesc *buf, bool forInput)
3006 {
3007         Assert(!InProgressBuf);
3008
3009         for (;;)
3010         {
3011                 /*
3012                  * Grab the io_in_progress lock so that other processes can wait for
3013                  * me to finish the I/O.
3014                  */
3015                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3016
3017                 LockBufHdr(buf);
3018
3019                 if (!(buf->flags & BM_IO_IN_PROGRESS))
3020                         break;
3021
3022                 /*
3023                  * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3024                  * lock isn't held is if the process doing the I/O is recovering from
3025                  * an error (see AbortBufferIO).  If that's the case, we must wait for
3026                  * him to get unwedged.
3027                  */
3028                 UnlockBufHdr(buf);
3029                 LWLockRelease(buf->io_in_progress_lock);
3030                 WaitIO(buf);
3031         }
3032
3033         /* Once we get here, there is definitely no I/O active on this buffer */
3034
3035         if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
3036         {
3037                 /* someone else already did the I/O */
3038                 UnlockBufHdr(buf);
3039                 LWLockRelease(buf->io_in_progress_lock);
3040                 return false;
3041         }
3042
3043         buf->flags |= BM_IO_IN_PROGRESS;
3044
3045         UnlockBufHdr(buf);
3046
3047         InProgressBuf = buf;
3048         IsForInput = forInput;
3049
3050         return true;
3051 }
3052
3053 /*
3054  * TerminateBufferIO: release a buffer we were doing I/O on
3055  *      (Assumptions)
3056  *      My process is executing IO for the buffer
3057  *      BM_IO_IN_PROGRESS bit is set for the buffer
3058  *      We hold the buffer's io_in_progress lock
3059  *      The buffer is Pinned
3060  *
3061  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3062  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
3063  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
3064  * marking the buffer clean if it was re-dirtied while we were writing.
3065  *
3066  * set_flag_bits gets ORed into the buffer's flags.  It must include
3067  * BM_IO_ERROR in a failure case.  For successful completion it could
3068  * be 0, or BM_VALID if we just finished reading in the page.
3069  */
3070 static void
3071 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
3072                                   int set_flag_bits)
3073 {
3074         Assert(buf == InProgressBuf);
3075
3076         LockBufHdr(buf);
3077
3078         Assert(buf->flags & BM_IO_IN_PROGRESS);
3079         buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3080         if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
3081                 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3082         buf->flags |= set_flag_bits;
3083
3084         UnlockBufHdr(buf);
3085
3086         InProgressBuf = NULL;
3087
3088         LWLockRelease(buf->io_in_progress_lock);
3089 }
3090
3091 /*
3092  * AbortBufferIO: Clean up any active buffer I/O after an error.
3093  *
3094  *      All LWLocks we might have held have been released,
3095  *      but we haven't yet released buffer pins, so the buffer is still pinned.
3096  *
3097  *      If I/O was in progress, we always set BM_IO_ERROR, even though it's
3098  *      possible the error condition wasn't related to the I/O.
3099  */
3100 void
3101 AbortBufferIO(void)
3102 {
3103         volatile BufferDesc *buf = InProgressBuf;
3104
3105         if (buf)
3106         {
3107                 /*
3108                  * Since LWLockReleaseAll has already been called, we're not holding
3109                  * the buffer's io_in_progress_lock. We have to re-acquire it so that
3110                  * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3111                  * buffer will be in a busy spin until we succeed in doing this.
3112                  */
3113                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3114
3115                 LockBufHdr(buf);
3116                 Assert(buf->flags & BM_IO_IN_PROGRESS);
3117                 if (IsForInput)
3118                 {
3119                         Assert(!(buf->flags & BM_DIRTY));
3120                         /* We'd better not think buffer is valid yet */
3121                         Assert(!(buf->flags & BM_VALID));
3122                         UnlockBufHdr(buf);
3123                 }
3124                 else
3125                 {
3126                         BufFlags        sv_flags;
3127
3128                         sv_flags = buf->flags;
3129                         Assert(sv_flags & BM_DIRTY);
3130                         UnlockBufHdr(buf);
3131                         /* Issue notice if this is not the first failure... */
3132                         if (sv_flags & BM_IO_ERROR)
3133                         {
3134                                 /* Buffer is pinned, so we can read tag without spinlock */
3135                                 char       *path;
3136
3137                                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
3138                                 ereport(WARNING,
3139                                                 (errcode(ERRCODE_IO_ERROR),
3140                                                  errmsg("could not write block %u of %s",
3141                                                                 buf->tag.blockNum, path),
3142                                                  errdetail("Multiple failures --- write error might be permanent.")));
3143                                 pfree(path);
3144                         }
3145                 }
3146                 TerminateBufferIO(buf, false, BM_IO_ERROR);
3147         }
3148 }
3149
3150 /*
3151  * Error context callback for errors occurring during shared buffer writes.
3152  */
3153 static void
3154 shared_buffer_write_error_callback(void *arg)
3155 {
3156         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3157
3158         /* Buffer is pinned, so we can read the tag without locking the spinlock */
3159         if (bufHdr != NULL)
3160         {
3161                 char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
3162
3163                 errcontext("writing block %u of relation %s",
3164                                    bufHdr->tag.blockNum, path);
3165                 pfree(path);
3166         }
3167 }
3168
3169 /*
3170  * Error context callback for errors occurring during local buffer writes.
3171  */
3172 static void
3173 local_buffer_write_error_callback(void *arg)
3174 {
3175         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3176
3177         if (bufHdr != NULL)
3178         {
3179                 char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
3180                                                                                   bufHdr->tag.forkNum);
3181
3182                 errcontext("writing block %u of relation %s",
3183                                    bufHdr->tag.blockNum, path);
3184                 pfree(path);
3185         }
3186 }
3187
3188 /*
3189  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
3190  */
3191 static int
3192 rnode_comparator(const void *p1, const void *p2)
3193 {
3194         RelFileNode n1 = *(RelFileNode *) p1;
3195         RelFileNode n2 = *(RelFileNode *) p2;
3196
3197         if (n1.relNode < n2.relNode)
3198                 return -1;
3199         else if (n1.relNode > n2.relNode)
3200                 return 1;
3201
3202         if (n1.dbNode < n2.dbNode)
3203                 return -1;
3204         else if (n1.dbNode > n2.dbNode)
3205                 return 1;
3206
3207         if (n1.spcNode < n2.spcNode)
3208                 return -1;
3209         else if (n1.spcNode > n2.spcNode)
3210                 return 1;
3211         else
3212                 return 0;
3213 }