]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
Improve coding around the fsync request queue.
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *              and pin it so that no one can destroy it while this process
20  *              is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *              The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *              freelist.c -- chooses victim for buffer replacement
29  *              buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "catalog/catalog.h"
37 #include "executor/instrument.h"
38 #include "miscadmin.h"
39 #include "pg_trace.h"
40 #include "pgstat.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/buf_internals.h"
43 #include "storage/bufmgr.h"
44 #include "storage/ipc.h"
45 #include "storage/proc.h"
46 #include "storage/smgr.h"
47 #include "storage/standby.h"
48 #include "utils/rel.h"
49 #include "utils/resowner.h"
50 #include "utils/timestamp.h"
51
52
53 /* Note: these two macros only work on shared buffers, not local ones! */
54 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
55 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
56
57 /* Note: this macro only works on local buffers, not shared ones! */
58 #define LocalBufHdrGetBlock(bufHdr) \
59         LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
60
61 /* Bits in SyncOneBuffer's return value */
62 #define BUF_WRITTEN                             0x01
63 #define BUF_REUSABLE                    0x02
64
65
66 /* GUC variables */
67 bool            zero_damaged_pages = false;
68 int                     bgwriter_lru_maxpages = 100;
69 double          bgwriter_lru_multiplier = 2.0;
70 bool            track_io_timing = false;
71
72 /*
73  * How many buffers PrefetchBuffer callers should try to stay ahead of their
74  * ReadBuffer calls by.  This is maintained by the assign hook for
75  * effective_io_concurrency.  Zero means "never prefetch".
76  */
77 int                     target_prefetch_pages = 0;
78
79 /* local state for StartBufferIO and related functions */
80 static volatile BufferDesc *InProgressBuf = NULL;
81 static bool IsForInput;
82
83 /* local state for LockBufferForCleanup */
84 static volatile BufferDesc *PinCountWaitBuf = NULL;
85
86
87 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
88                                   ForkNumber forkNum, BlockNumber blockNum,
89                                   ReadBufferMode mode, BufferAccessStrategy strategy,
90                                   bool *hit);
91 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
92 static void PinBuffer_Locked(volatile BufferDesc *buf);
93 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
94 static void BufferSync(int flags);
95 static int      SyncOneBuffer(int buf_id, bool skip_recently_used);
96 static void WaitIO(volatile BufferDesc *buf);
97 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
98 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
99                                   int set_flag_bits);
100 static void shared_buffer_write_error_callback(void *arg);
101 static void local_buffer_write_error_callback(void *arg);
102 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
103                         char relpersistence,
104                         ForkNumber forkNum,
105                         BlockNumber blockNum,
106                         BufferAccessStrategy strategy,
107                         bool *foundPtr);
108 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
109 static void AtProcExit_Buffers(int code, Datum arg);
110
111
112 /*
113  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
114  *
115  * This is named by analogy to ReadBuffer but doesn't actually allocate a
116  * buffer.      Instead it tries to ensure that a future ReadBuffer for the given
117  * block will not be delayed by the I/O.  Prefetching is optional.
118  * No-op if prefetching isn't compiled in.
119  */
120 void
121 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
122 {
123 #ifdef USE_PREFETCH
124         Assert(RelationIsValid(reln));
125         Assert(BlockNumberIsValid(blockNum));
126
127         /* Open it at the smgr level if not already done */
128         RelationOpenSmgr(reln);
129
130         if (RelationUsesLocalBuffers(reln))
131         {
132                 /* see comments in ReadBufferExtended */
133                 if (RELATION_IS_OTHER_TEMP(reln))
134                         ereport(ERROR,
135                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
136                                 errmsg("cannot access temporary tables of other sessions")));
137
138                 /* pass it off to localbuf.c */
139                 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
140         }
141         else
142         {
143                 BufferTag       newTag;         /* identity of requested block */
144                 uint32          newHash;        /* hash value for newTag */
145                 LWLockId        newPartitionLock;       /* buffer partition lock for it */
146                 int                     buf_id;
147
148                 /* create a tag so we can lookup the buffer */
149                 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
150                                            forkNum, blockNum);
151
152                 /* determine its hash code and partition lock ID */
153                 newHash = BufTableHashCode(&newTag);
154                 newPartitionLock = BufMappingPartitionLock(newHash);
155
156                 /* see if the block is in the buffer pool already */
157                 LWLockAcquire(newPartitionLock, LW_SHARED);
158                 buf_id = BufTableLookup(&newTag, newHash);
159                 LWLockRelease(newPartitionLock);
160
161                 /* If not in buffers, initiate prefetch */
162                 if (buf_id < 0)
163                         smgrprefetch(reln->rd_smgr, forkNum, blockNum);
164
165                 /*
166                  * If the block *is* in buffers, we do nothing.  This is not really
167                  * ideal: the block might be just about to be evicted, which would be
168                  * stupid since we know we are going to need it soon.  But the only
169                  * easy answer is to bump the usage_count, which does not seem like a
170                  * great solution: when the caller does ultimately touch the block,
171                  * usage_count would get bumped again, resulting in too much
172                  * favoritism for blocks that are involved in a prefetch sequence. A
173                  * real fix would involve some additional per-buffer state, and it's
174                  * not clear that there's enough of a problem to justify that.
175                  */
176         }
177 #endif   /* USE_PREFETCH */
178 }
179
180
181 /*
182  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
183  *              fork with RBM_NORMAL mode and default strategy.
184  */
185 Buffer
186 ReadBuffer(Relation reln, BlockNumber blockNum)
187 {
188         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
189 }
190
191 /*
192  * ReadBufferExtended -- returns a buffer containing the requested
193  *              block of the requested relation.  If the blknum
194  *              requested is P_NEW, extend the relation file and
195  *              allocate a new block.  (Caller is responsible for
196  *              ensuring that only one backend tries to extend a
197  *              relation at the same time!)
198  *
199  * Returns: the buffer number for the buffer containing
200  *              the block read.  The returned buffer has been pinned.
201  *              Does not return on error --- elog's instead.
202  *
203  * Assume when this function is called, that reln has been opened already.
204  *
205  * In RBM_NORMAL mode, the page is read from disk, and the page header is
206  * validated. An error is thrown if the page header is not valid.
207  *
208  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
209  * valid, the page is zeroed instead of throwing an error. This is intended
210  * for non-critical data, where the caller is prepared to repair errors.
211  *
212  * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
213  * with zeros instead of reading it from disk.  Useful when the caller is
214  * going to fill the page from scratch, since this saves I/O and avoids
215  * unnecessary failure if the page-on-disk has corrupt page headers.
216  * Caution: do not use this mode to read a page that is beyond the relation's
217  * current physical EOF; that is likely to cause problems in md.c when
218  * the page is modified and written out. P_NEW is OK, though.
219  *
220  * If strategy is not NULL, a nondefault buffer access strategy is used.
221  * See buffer/README for details.
222  */
223 Buffer
224 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
225                                    ReadBufferMode mode, BufferAccessStrategy strategy)
226 {
227         bool            hit;
228         Buffer          buf;
229
230         /* Open it at the smgr level if not already done */
231         RelationOpenSmgr(reln);
232
233         /*
234          * Reject attempts to read non-local temporary relations; we would be
235          * likely to get wrong data since we have no visibility into the owning
236          * session's local buffers.
237          */
238         if (RELATION_IS_OTHER_TEMP(reln))
239                 ereport(ERROR,
240                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
241                                  errmsg("cannot access temporary tables of other sessions")));
242
243         /*
244          * Read the buffer, and update pgstat counters to reflect a cache hit or
245          * miss.
246          */
247         pgstat_count_buffer_read(reln);
248         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
249                                                         forkNum, blockNum, mode, strategy, &hit);
250         if (hit)
251                 pgstat_count_buffer_hit(reln);
252         return buf;
253 }
254
255
256 /*
257  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
258  *              a relcache entry for the relation.
259  *
260  * NB: At present, this function may only be used on permanent relations, which
261  * is OK, because we only use it during XLOG replay.  If in the future we
262  * want to use it on temporary or unlogged relations, we could pass additional
263  * parameters.
264  */
265 Buffer
266 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
267                                                   BlockNumber blockNum, ReadBufferMode mode,
268                                                   BufferAccessStrategy strategy)
269 {
270         bool            hit;
271
272         SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
273
274         return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
275                                                          mode, strategy, &hit);
276 }
277
278
279 /*
280  * ReadBuffer_common -- common logic for all ReadBuffer variants
281  *
282  * *hit is set to true if the request was satisfied from shared buffer cache.
283  */
284 static Buffer
285 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
286                                   BlockNumber blockNum, ReadBufferMode mode,
287                                   BufferAccessStrategy strategy, bool *hit)
288 {
289         volatile BufferDesc *bufHdr;
290         Block           bufBlock;
291         bool            found;
292         bool            isExtend;
293         bool            isLocalBuf = SmgrIsTemp(smgr);
294
295         *hit = false;
296
297         /* Make sure we will have room to remember the buffer pin */
298         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
299
300         isExtend = (blockNum == P_NEW);
301
302         TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
303                                                                            smgr->smgr_rnode.node.spcNode,
304                                                                            smgr->smgr_rnode.node.dbNode,
305                                                                            smgr->smgr_rnode.node.relNode,
306                                                                            smgr->smgr_rnode.backend,
307                                                                            isExtend);
308
309         /* Substitute proper block number if caller asked for P_NEW */
310         if (isExtend)
311                 blockNum = smgrnblocks(smgr, forkNum);
312
313         if (isLocalBuf)
314         {
315                 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
316                 if (found)
317                         pgBufferUsage.local_blks_hit++;
318                 else
319                         pgBufferUsage.local_blks_read++;
320         }
321         else
322         {
323                 /*
324                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
325                  * not currently in memory.
326                  */
327                 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
328                                                          strategy, &found);
329                 if (found)
330                         pgBufferUsage.shared_blks_hit++;
331                 else
332                         pgBufferUsage.shared_blks_read++;
333         }
334
335         /* At this point we do NOT hold any locks. */
336
337         /* if it was already in the buffer pool, we're done */
338         if (found)
339         {
340                 if (!isExtend)
341                 {
342                         /* Just need to update stats before we exit */
343                         *hit = true;
344                         VacuumPageHit++;
345
346                         if (VacuumCostActive)
347                                 VacuumCostBalance += VacuumCostPageHit;
348
349                         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
350                                                                                           smgr->smgr_rnode.node.spcNode,
351                                                                                           smgr->smgr_rnode.node.dbNode,
352                                                                                           smgr->smgr_rnode.node.relNode,
353                                                                                           smgr->smgr_rnode.backend,
354                                                                                           isExtend,
355                                                                                           found);
356
357                         return BufferDescriptorGetBuffer(bufHdr);
358                 }
359
360                 /*
361                  * We get here only in the corner case where we are trying to extend
362                  * the relation but we found a pre-existing buffer marked BM_VALID.
363                  * This can happen because mdread doesn't complain about reads beyond
364                  * EOF (when zero_damaged_pages is ON) and so a previous attempt to
365                  * read a block beyond EOF could have left a "valid" zero-filled
366                  * buffer.      Unfortunately, we have also seen this case occurring
367                  * because of buggy Linux kernels that sometimes return an
368                  * lseek(SEEK_END) result that doesn't account for a recent write. In
369                  * that situation, the pre-existing buffer would contain valid data
370                  * that we don't want to overwrite.  Since the legitimate case should
371                  * always have left a zero-filled buffer, complain if not PageIsNew.
372                  */
373                 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
374                 if (!PageIsNew((Page) bufBlock))
375                         ereport(ERROR,
376                          (errmsg("unexpected data beyond EOF in block %u of relation %s",
377                                          blockNum, relpath(smgr->smgr_rnode, forkNum)),
378                           errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
379
380                 /*
381                  * We *must* do smgrextend before succeeding, else the page will not
382                  * be reserved by the kernel, and the next P_NEW call will decide to
383                  * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
384                  * call that BufferAlloc didn't, and proceed.
385                  */
386                 if (isLocalBuf)
387                 {
388                         /* Only need to adjust flags */
389                         Assert(bufHdr->flags & BM_VALID);
390                         bufHdr->flags &= ~BM_VALID;
391                 }
392                 else
393                 {
394                         /*
395                          * Loop to handle the very small possibility that someone re-sets
396                          * BM_VALID between our clearing it and StartBufferIO inspecting
397                          * it.
398                          */
399                         do
400                         {
401                                 LockBufHdr(bufHdr);
402                                 Assert(bufHdr->flags & BM_VALID);
403                                 bufHdr->flags &= ~BM_VALID;
404                                 UnlockBufHdr(bufHdr);
405                         } while (!StartBufferIO(bufHdr, true));
406                 }
407         }
408
409         /*
410          * if we have gotten to this point, we have allocated a buffer for the
411          * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
412          * if it's a shared buffer.
413          *
414          * Note: if smgrextend fails, we will end up with a buffer that is
415          * allocated but not marked BM_VALID.  P_NEW will still select the same
416          * block number (because the relation didn't get any longer on disk) and
417          * so future attempts to extend the relation will find the same buffer (if
418          * it's not been recycled) but come right back here to try smgrextend
419          * again.
420          */
421         Assert(!(bufHdr->flags & BM_VALID));            /* spinlock not needed */
422
423         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
424
425         if (isExtend)
426         {
427                 /* new buffers are zero-filled */
428                 MemSet((char *) bufBlock, 0, BLCKSZ);
429                 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
430         }
431         else
432         {
433                 /*
434                  * Read in the page, unless the caller intends to overwrite it and
435                  * just wants us to allocate a buffer.
436                  */
437                 if (mode == RBM_ZERO)
438                         MemSet((char *) bufBlock, 0, BLCKSZ);
439                 else
440                 {
441                         instr_time      io_start,
442                                                 io_time;
443
444                         if (track_io_timing)
445                                 INSTR_TIME_SET_CURRENT(io_start);
446
447                         smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
448
449                         if (track_io_timing)
450                         {
451                                 INSTR_TIME_SET_CURRENT(io_time);
452                                 INSTR_TIME_SUBTRACT(io_time, io_start);
453                                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
454                                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
455                         }
456
457                         /* check for garbage data */
458                         if (!PageHeaderIsValid((PageHeader) bufBlock))
459                         {
460                                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
461                                 {
462                                         ereport(WARNING,
463                                                         (errcode(ERRCODE_DATA_CORRUPTED),
464                                                          errmsg("invalid page header in block %u of relation %s; zeroing out page",
465                                                                         blockNum,
466                                                                         relpath(smgr->smgr_rnode, forkNum))));
467                                         MemSet((char *) bufBlock, 0, BLCKSZ);
468                                 }
469                                 else
470                                         ereport(ERROR,
471                                                         (errcode(ERRCODE_DATA_CORRUPTED),
472                                          errmsg("invalid page header in block %u of relation %s",
473                                                         blockNum,
474                                                         relpath(smgr->smgr_rnode, forkNum))));
475                         }
476                 }
477         }
478
479         if (isLocalBuf)
480         {
481                 /* Only need to adjust flags */
482                 bufHdr->flags |= BM_VALID;
483         }
484         else
485         {
486                 /* Set BM_VALID, terminate IO, and wake up any waiters */
487                 TerminateBufferIO(bufHdr, false, BM_VALID);
488         }
489
490         VacuumPageMiss++;
491         if (VacuumCostActive)
492                 VacuumCostBalance += VacuumCostPageMiss;
493
494         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
495                                                                           smgr->smgr_rnode.node.spcNode,
496                                                                           smgr->smgr_rnode.node.dbNode,
497                                                                           smgr->smgr_rnode.node.relNode,
498                                                                           smgr->smgr_rnode.backend,
499                                                                           isExtend,
500                                                                           found);
501
502         return BufferDescriptorGetBuffer(bufHdr);
503 }
504
505 /*
506  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
507  *              buffer.  If no buffer exists already, selects a replacement
508  *              victim and evicts the old page, but does NOT read in new page.
509  *
510  * "strategy" can be a buffer replacement strategy object, or NULL for
511  * the default strategy.  The selected buffer's usage_count is advanced when
512  * using the default strategy, but otherwise possibly not (see PinBuffer).
513  *
514  * The returned buffer is pinned and is already marked as holding the
515  * desired page.  If it already did have the desired page, *foundPtr is
516  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
517  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
518  *
519  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
520  * we keep it for simplicity in ReadBuffer.
521  *
522  * No locks are held either at entry or exit.
523  */
524 static volatile BufferDesc *
525 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
526                         BlockNumber blockNum,
527                         BufferAccessStrategy strategy,
528                         bool *foundPtr)
529 {
530         BufferTag       newTag;                 /* identity of requested block */
531         uint32          newHash;                /* hash value for newTag */
532         LWLockId        newPartitionLock;               /* buffer partition lock for it */
533         BufferTag       oldTag;                 /* previous identity of selected buffer */
534         uint32          oldHash;                /* hash value for oldTag */
535         LWLockId        oldPartitionLock;               /* buffer partition lock for it */
536         BufFlags        oldFlags;
537         int                     buf_id;
538         volatile BufferDesc *buf;
539         bool            valid;
540
541         /* create a tag so we can lookup the buffer */
542         INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
543
544         /* determine its hash code and partition lock ID */
545         newHash = BufTableHashCode(&newTag);
546         newPartitionLock = BufMappingPartitionLock(newHash);
547
548         /* see if the block is in the buffer pool already */
549         LWLockAcquire(newPartitionLock, LW_SHARED);
550         buf_id = BufTableLookup(&newTag, newHash);
551         if (buf_id >= 0)
552         {
553                 /*
554                  * Found it.  Now, pin the buffer so no one can steal it from the
555                  * buffer pool, and check to see if the correct data has been loaded
556                  * into the buffer.
557                  */
558                 buf = &BufferDescriptors[buf_id];
559
560                 valid = PinBuffer(buf, strategy);
561
562                 /* Can release the mapping lock as soon as we've pinned it */
563                 LWLockRelease(newPartitionLock);
564
565                 *foundPtr = TRUE;
566
567                 if (!valid)
568                 {
569                         /*
570                          * We can only get here if (a) someone else is still reading in
571                          * the page, or (b) a previous read attempt failed.  We have to
572                          * wait for any active read attempt to finish, and then set up our
573                          * own read attempt if the page is still not BM_VALID.
574                          * StartBufferIO does it all.
575                          */
576                         if (StartBufferIO(buf, true))
577                         {
578                                 /*
579                                  * If we get here, previous attempts to read the buffer must
580                                  * have failed ... but we shall bravely try again.
581                                  */
582                                 *foundPtr = FALSE;
583                         }
584                 }
585
586                 return buf;
587         }
588
589         /*
590          * Didn't find it in the buffer pool.  We'll have to initialize a new
591          * buffer.      Remember to unlock the mapping lock while doing the work.
592          */
593         LWLockRelease(newPartitionLock);
594
595         /* Loop here in case we have to try another victim buffer */
596         for (;;)
597         {
598                 bool            lock_held;
599
600                 /*
601                  * Select a victim buffer.      The buffer is returned with its header
602                  * spinlock still held!  Also (in most cases) the BufFreelistLock is
603                  * still held, since it would be bad to hold the spinlock while
604                  * possibly waking up other processes.
605                  */
606                 buf = StrategyGetBuffer(strategy, &lock_held);
607
608                 Assert(buf->refcount == 0);
609
610                 /* Must copy buffer flags while we still hold the spinlock */
611                 oldFlags = buf->flags;
612
613                 /* Pin the buffer and then release the buffer spinlock */
614                 PinBuffer_Locked(buf);
615
616                 /* Now it's safe to release the freelist lock */
617                 if (lock_held)
618                         LWLockRelease(BufFreelistLock);
619
620                 /*
621                  * If the buffer was dirty, try to write it out.  There is a race
622                  * condition here, in that someone might dirty it after we released it
623                  * above, or even while we are writing it out (since our share-lock
624                  * won't prevent hint-bit updates).  We will recheck the dirty bit
625                  * after re-locking the buffer header.
626                  */
627                 if (oldFlags & BM_DIRTY)
628                 {
629                         /*
630                          * We need a share-lock on the buffer contents to write it out
631                          * (else we might write invalid data, eg because someone else is
632                          * compacting the page contents while we write).  We must use a
633                          * conditional lock acquisition here to avoid deadlock.  Even
634                          * though the buffer was not pinned (and therefore surely not
635                          * locked) when StrategyGetBuffer returned it, someone else could
636                          * have pinned and exclusive-locked it by the time we get here. If
637                          * we try to get the lock unconditionally, we'd block waiting for
638                          * them; if they later block waiting for us, deadlock ensues.
639                          * (This has been observed to happen when two backends are both
640                          * trying to split btree index pages, and the second one just
641                          * happens to be trying to split the page the first one got from
642                          * StrategyGetBuffer.)
643                          */
644                         if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
645                         {
646                                 /*
647                                  * If using a nondefault strategy, and writing the buffer
648                                  * would require a WAL flush, let the strategy decide whether
649                                  * to go ahead and write/reuse the buffer or to choose another
650                                  * victim.      We need lock to inspect the page LSN, so this
651                                  * can't be done inside StrategyGetBuffer.
652                                  */
653                                 if (strategy != NULL &&
654                                         XLogNeedsFlush(BufferGetLSN(buf)) &&
655                                         StrategyRejectBuffer(strategy, buf))
656                                 {
657                                         /* Drop lock/pin and loop around for another buffer */
658                                         LWLockRelease(buf->content_lock);
659                                         UnpinBuffer(buf, true);
660                                         continue;
661                                 }
662
663                                 /* OK, do the I/O */
664                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
665                                                                                            smgr->smgr_rnode.node.spcNode,
666                                                                                                 smgr->smgr_rnode.node.dbNode,
667                                                                                           smgr->smgr_rnode.node.relNode);
668
669                                 FlushBuffer(buf, NULL);
670                                 LWLockRelease(buf->content_lock);
671
672                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
673                                                                                            smgr->smgr_rnode.node.spcNode,
674                                                                                                 smgr->smgr_rnode.node.dbNode,
675                                                                                           smgr->smgr_rnode.node.relNode);
676                         }
677                         else
678                         {
679                                 /*
680                                  * Someone else has locked the buffer, so give it up and loop
681                                  * back to get another one.
682                                  */
683                                 UnpinBuffer(buf, true);
684                                 continue;
685                         }
686                 }
687
688                 /*
689                  * To change the association of a valid buffer, we'll need to have
690                  * exclusive lock on both the old and new mapping partitions.
691                  */
692                 if (oldFlags & BM_TAG_VALID)
693                 {
694                         /*
695                          * Need to compute the old tag's hashcode and partition lock ID.
696                          * XXX is it worth storing the hashcode in BufferDesc so we need
697                          * not recompute it here?  Probably not.
698                          */
699                         oldTag = buf->tag;
700                         oldHash = BufTableHashCode(&oldTag);
701                         oldPartitionLock = BufMappingPartitionLock(oldHash);
702
703                         /*
704                          * Must lock the lower-numbered partition first to avoid
705                          * deadlocks.
706                          */
707                         if (oldPartitionLock < newPartitionLock)
708                         {
709                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
710                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
711                         }
712                         else if (oldPartitionLock > newPartitionLock)
713                         {
714                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
715                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
716                         }
717                         else
718                         {
719                                 /* only one partition, only one lock */
720                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
721                         }
722                 }
723                 else
724                 {
725                         /* if it wasn't valid, we need only the new partition */
726                         LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
727                         /* these just keep the compiler quiet about uninit variables */
728                         oldHash = 0;
729                         oldPartitionLock = 0;
730                 }
731
732                 /*
733                  * Try to make a hashtable entry for the buffer under its new tag.
734                  * This could fail because while we were writing someone else
735                  * allocated another buffer for the same block we want to read in.
736                  * Note that we have not yet removed the hashtable entry for the old
737                  * tag.
738                  */
739                 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
740
741                 if (buf_id >= 0)
742                 {
743                         /*
744                          * Got a collision. Someone has already done what we were about to
745                          * do. We'll just handle this as if it were found in the buffer
746                          * pool in the first place.  First, give up the buffer we were
747                          * planning to use.
748                          */
749                         UnpinBuffer(buf, true);
750
751                         /* Can give up that buffer's mapping partition lock now */
752                         if ((oldFlags & BM_TAG_VALID) &&
753                                 oldPartitionLock != newPartitionLock)
754                                 LWLockRelease(oldPartitionLock);
755
756                         /* remaining code should match code at top of routine */
757
758                         buf = &BufferDescriptors[buf_id];
759
760                         valid = PinBuffer(buf, strategy);
761
762                         /* Can release the mapping lock as soon as we've pinned it */
763                         LWLockRelease(newPartitionLock);
764
765                         *foundPtr = TRUE;
766
767                         if (!valid)
768                         {
769                                 /*
770                                  * We can only get here if (a) someone else is still reading
771                                  * in the page, or (b) a previous read attempt failed.  We
772                                  * have to wait for any active read attempt to finish, and
773                                  * then set up our own read attempt if the page is still not
774                                  * BM_VALID.  StartBufferIO does it all.
775                                  */
776                                 if (StartBufferIO(buf, true))
777                                 {
778                                         /*
779                                          * If we get here, previous attempts to read the buffer
780                                          * must have failed ... but we shall bravely try again.
781                                          */
782                                         *foundPtr = FALSE;
783                                 }
784                         }
785
786                         return buf;
787                 }
788
789                 /*
790                  * Need to lock the buffer header too in order to change its tag.
791                  */
792                 LockBufHdr(buf);
793
794                 /*
795                  * Somebody could have pinned or re-dirtied the buffer while we were
796                  * doing the I/O and making the new hashtable entry.  If so, we can't
797                  * recycle this buffer; we must undo everything we've done and start
798                  * over with a new victim buffer.
799                  */
800                 oldFlags = buf->flags;
801                 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
802                         break;
803
804                 UnlockBufHdr(buf);
805                 BufTableDelete(&newTag, newHash);
806                 if ((oldFlags & BM_TAG_VALID) &&
807                         oldPartitionLock != newPartitionLock)
808                         LWLockRelease(oldPartitionLock);
809                 LWLockRelease(newPartitionLock);
810                 UnpinBuffer(buf, true);
811         }
812
813         /*
814          * Okay, it's finally safe to rename the buffer.
815          *
816          * Clearing BM_VALID here is necessary, clearing the dirtybits is just
817          * paranoia.  We also reset the usage_count since any recency of use of
818          * the old content is no longer relevant.  (The usage_count starts out at
819          * 1 so that the buffer can survive one clock-sweep pass.)
820          */
821         buf->tag = newTag;
822         buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
823         if (relpersistence == RELPERSISTENCE_PERMANENT)
824                 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
825         else
826                 buf->flags |= BM_TAG_VALID;
827         buf->usage_count = 1;
828
829         UnlockBufHdr(buf);
830
831         if (oldFlags & BM_TAG_VALID)
832         {
833                 BufTableDelete(&oldTag, oldHash);
834                 if (oldPartitionLock != newPartitionLock)
835                         LWLockRelease(oldPartitionLock);
836         }
837
838         LWLockRelease(newPartitionLock);
839
840         /*
841          * Buffer contents are currently invalid.  Try to get the io_in_progress
842          * lock.  If StartBufferIO returns false, then someone else managed to
843          * read it before we did, so there's nothing left for BufferAlloc() to do.
844          */
845         if (StartBufferIO(buf, true))
846                 *foundPtr = FALSE;
847         else
848                 *foundPtr = TRUE;
849
850         return buf;
851 }
852
853 /*
854  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
855  * freelist.
856  *
857  * The buffer header spinlock must be held at entry.  We drop it before
858  * returning.  (This is sane because the caller must have locked the
859  * buffer in order to be sure it should be dropped.)
860  *
861  * This is used only in contexts such as dropping a relation.  We assume
862  * that no other backend could possibly be interested in using the page,
863  * so the only reason the buffer might be pinned is if someone else is
864  * trying to write it out.      We have to let them finish before we can
865  * reclaim the buffer.
866  *
867  * The buffer could get reclaimed by someone else while we are waiting
868  * to acquire the necessary locks; if so, don't mess it up.
869  */
870 static void
871 InvalidateBuffer(volatile BufferDesc *buf)
872 {
873         BufferTag       oldTag;
874         uint32          oldHash;                /* hash value for oldTag */
875         LWLockId        oldPartitionLock;               /* buffer partition lock for it */
876         BufFlags        oldFlags;
877
878         /* Save the original buffer tag before dropping the spinlock */
879         oldTag = buf->tag;
880
881         UnlockBufHdr(buf);
882
883         /*
884          * Need to compute the old tag's hashcode and partition lock ID. XXX is it
885          * worth storing the hashcode in BufferDesc so we need not recompute it
886          * here?  Probably not.
887          */
888         oldHash = BufTableHashCode(&oldTag);
889         oldPartitionLock = BufMappingPartitionLock(oldHash);
890
891 retry:
892
893         /*
894          * Acquire exclusive mapping lock in preparation for changing the buffer's
895          * association.
896          */
897         LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
898
899         /* Re-lock the buffer header */
900         LockBufHdr(buf);
901
902         /* If it's changed while we were waiting for lock, do nothing */
903         if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
904         {
905                 UnlockBufHdr(buf);
906                 LWLockRelease(oldPartitionLock);
907                 return;
908         }
909
910         /*
911          * We assume the only reason for it to be pinned is that someone else is
912          * flushing the page out.  Wait for them to finish.  (This could be an
913          * infinite loop if the refcount is messed up... it would be nice to time
914          * out after awhile, but there seems no way to be sure how many loops may
915          * be needed.  Note that if the other guy has pinned the buffer but not
916          * yet done StartBufferIO, WaitIO will fall through and we'll effectively
917          * be busy-looping here.)
918          */
919         if (buf->refcount != 0)
920         {
921                 UnlockBufHdr(buf);
922                 LWLockRelease(oldPartitionLock);
923                 /* safety check: should definitely not be our *own* pin */
924                 if (PrivateRefCount[buf->buf_id] != 0)
925                         elog(ERROR, "buffer is pinned in InvalidateBuffer");
926                 WaitIO(buf);
927                 goto retry;
928         }
929
930         /*
931          * Clear out the buffer's tag and flags.  We must do this to ensure that
932          * linear scans of the buffer array don't think the buffer is valid.
933          */
934         oldFlags = buf->flags;
935         CLEAR_BUFFERTAG(buf->tag);
936         buf->flags = 0;
937         buf->usage_count = 0;
938
939         UnlockBufHdr(buf);
940
941         /*
942          * Remove the buffer from the lookup hashtable, if it was in there.
943          */
944         if (oldFlags & BM_TAG_VALID)
945                 BufTableDelete(&oldTag, oldHash);
946
947         /*
948          * Done with mapping lock.
949          */
950         LWLockRelease(oldPartitionLock);
951
952         /*
953          * Insert the buffer at the head of the list of free buffers.
954          */
955         StrategyFreeBuffer(buf);
956 }
957
958 /*
959  * MarkBufferDirty
960  *
961  *              Marks buffer contents as dirty (actual write happens later).
962  *
963  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
964  * exclusive lock, then somebody could be in process of writing the buffer,
965  * leading to risk of bad data written to disk.)
966  */
967 void
968 MarkBufferDirty(Buffer buffer)
969 {
970         volatile BufferDesc *bufHdr;
971
972         if (!BufferIsValid(buffer))
973                 elog(ERROR, "bad buffer ID: %d", buffer);
974
975         if (BufferIsLocal(buffer))
976         {
977                 MarkLocalBufferDirty(buffer);
978                 return;
979         }
980
981         bufHdr = &BufferDescriptors[buffer - 1];
982
983         Assert(PrivateRefCount[buffer - 1] > 0);
984         /* unfortunately we can't check if the lock is held exclusively */
985         Assert(LWLockHeldByMe(bufHdr->content_lock));
986
987         LockBufHdr(bufHdr);
988
989         Assert(bufHdr->refcount > 0);
990
991         /*
992          * If the buffer was not dirty already, do vacuum accounting.
993          */
994         if (!(bufHdr->flags & BM_DIRTY))
995         {
996                 VacuumPageDirty++;
997                 pgBufferUsage.shared_blks_dirtied++;
998                 if (VacuumCostActive)
999                         VacuumCostBalance += VacuumCostPageDirty;
1000         }
1001
1002         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1003
1004         UnlockBufHdr(bufHdr);
1005 }
1006
1007 /*
1008  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1009  *
1010  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1011  * compared to calling the two routines separately.  Now it's mainly just
1012  * a convenience function.      However, if the passed buffer is valid and
1013  * already contains the desired block, we just return it as-is; and that
1014  * does save considerable work compared to a full release and reacquire.
1015  *
1016  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1017  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1018  * but can save some tests in the caller.
1019  */
1020 Buffer
1021 ReleaseAndReadBuffer(Buffer buffer,
1022                                          Relation relation,
1023                                          BlockNumber blockNum)
1024 {
1025         ForkNumber      forkNum = MAIN_FORKNUM;
1026         volatile BufferDesc *bufHdr;
1027
1028         if (BufferIsValid(buffer))
1029         {
1030                 if (BufferIsLocal(buffer))
1031                 {
1032                         Assert(LocalRefCount[-buffer - 1] > 0);
1033                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
1034                         if (bufHdr->tag.blockNum == blockNum &&
1035                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1036                                 bufHdr->tag.forkNum == forkNum)
1037                                 return buffer;
1038                         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1039                         LocalRefCount[-buffer - 1]--;
1040                 }
1041                 else
1042                 {
1043                         Assert(PrivateRefCount[buffer - 1] > 0);
1044                         bufHdr = &BufferDescriptors[buffer - 1];
1045                         /* we have pin, so it's ok to examine tag without spinlock */
1046                         if (bufHdr->tag.blockNum == blockNum &&
1047                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1048                                 bufHdr->tag.forkNum == forkNum)
1049                                 return buffer;
1050                         UnpinBuffer(bufHdr, true);
1051                 }
1052         }
1053
1054         return ReadBuffer(relation, blockNum);
1055 }
1056
1057 /*
1058  * PinBuffer -- make buffer unavailable for replacement.
1059  *
1060  * For the default access strategy, the buffer's usage_count is incremented
1061  * when we first pin it; for other strategies we just make sure the usage_count
1062  * isn't zero.  (The idea of the latter is that we don't want synchronized
1063  * heap scans to inflate the count, but we need it to not be zero to discourage
1064  * other backends from stealing buffers from our ring.  As long as we cycle
1065  * through the ring faster than the global clock-sweep cycles, buffers in
1066  * our ring won't be chosen as victims for replacement by other backends.)
1067  *
1068  * This should be applied only to shared buffers, never local ones.
1069  *
1070  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1071  *
1072  * Returns TRUE if buffer is BM_VALID, else FALSE.      This provision allows
1073  * some callers to avoid an extra spinlock cycle.
1074  */
1075 static bool
1076 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
1077 {
1078         int                     b = buf->buf_id;
1079         bool            result;
1080
1081         if (PrivateRefCount[b] == 0)
1082         {
1083                 LockBufHdr(buf);
1084                 buf->refcount++;
1085                 if (strategy == NULL)
1086                 {
1087                         if (buf->usage_count < BM_MAX_USAGE_COUNT)
1088                                 buf->usage_count++;
1089                 }
1090                 else
1091                 {
1092                         if (buf->usage_count == 0)
1093                                 buf->usage_count = 1;
1094                 }
1095                 result = (buf->flags & BM_VALID) != 0;
1096                 UnlockBufHdr(buf);
1097         }
1098         else
1099         {
1100                 /* If we previously pinned the buffer, it must surely be valid */
1101                 result = true;
1102         }
1103         PrivateRefCount[b]++;
1104         Assert(PrivateRefCount[b] > 0);
1105         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1106                                                                 BufferDescriptorGetBuffer(buf));
1107         return result;
1108 }
1109
1110 /*
1111  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1112  * The spinlock is released before return.
1113  *
1114  * Currently, no callers of this function want to modify the buffer's
1115  * usage_count at all, so there's no need for a strategy parameter.
1116  * Also we don't bother with a BM_VALID test (the caller could check that for
1117  * itself).
1118  *
1119  * Note: use of this routine is frequently mandatory, not just an optimization
1120  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1121  * its state can change under us.
1122  */
1123 static void
1124 PinBuffer_Locked(volatile BufferDesc *buf)
1125 {
1126         int                     b = buf->buf_id;
1127
1128         if (PrivateRefCount[b] == 0)
1129                 buf->refcount++;
1130         UnlockBufHdr(buf);
1131         PrivateRefCount[b]++;
1132         Assert(PrivateRefCount[b] > 0);
1133         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1134                                                                 BufferDescriptorGetBuffer(buf));
1135 }
1136
1137 /*
1138  * UnpinBuffer -- make buffer available for replacement.
1139  *
1140  * This should be applied only to shared buffers, never local ones.
1141  *
1142  * Most but not all callers want CurrentResourceOwner to be adjusted.
1143  * Those that don't should pass fixOwner = FALSE.
1144  */
1145 static void
1146 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
1147 {
1148         int                     b = buf->buf_id;
1149
1150         if (fixOwner)
1151                 ResourceOwnerForgetBuffer(CurrentResourceOwner,
1152                                                                   BufferDescriptorGetBuffer(buf));
1153
1154         Assert(PrivateRefCount[b] > 0);
1155         PrivateRefCount[b]--;
1156         if (PrivateRefCount[b] == 0)
1157         {
1158                 /* I'd better not still hold any locks on the buffer */
1159                 Assert(!LWLockHeldByMe(buf->content_lock));
1160                 Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
1161
1162                 LockBufHdr(buf);
1163
1164                 /* Decrement the shared reference count */
1165                 Assert(buf->refcount > 0);
1166                 buf->refcount--;
1167
1168                 /* Support LockBufferForCleanup() */
1169                 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
1170                         buf->refcount == 1)
1171                 {
1172                         /* we just released the last pin other than the waiter's */
1173                         int                     wait_backend_pid = buf->wait_backend_pid;
1174
1175                         buf->flags &= ~BM_PIN_COUNT_WAITER;
1176                         UnlockBufHdr(buf);
1177                         ProcSendSignal(wait_backend_pid);
1178                 }
1179                 else
1180                         UnlockBufHdr(buf);
1181         }
1182 }
1183
1184 /*
1185  * BufferSync -- Write out all dirty buffers in the pool.
1186  *
1187  * This is called at checkpoint time to write out all dirty shared buffers.
1188  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1189  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN is
1190  * set, we write even unlogged buffers, which are otherwise skipped.  The
1191  * remaining flags currently have no effect here.
1192  */
1193 static void
1194 BufferSync(int flags)
1195 {
1196         int                     buf_id;
1197         int                     num_to_scan;
1198         int                     num_to_write;
1199         int                     num_written;
1200         int                     mask = BM_DIRTY;
1201
1202         /* Make sure we can handle the pin inside SyncOneBuffer */
1203         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1204
1205         /*
1206          * Unless this is a shutdown checkpoint, we write only permanent, dirty
1207          * buffers.  But at shutdown time, we write all dirty buffers.
1208          */
1209         if (!(flags & CHECKPOINT_IS_SHUTDOWN))
1210                 mask |= BM_PERMANENT;
1211
1212         /*
1213          * Loop over all buffers, and mark the ones that need to be written with
1214          * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
1215          * can estimate how much work needs to be done.
1216          *
1217          * This allows us to write only those pages that were dirty when the
1218          * checkpoint began, and not those that get dirtied while it proceeds.
1219          * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1220          * later in this function, or by normal backends or the bgwriter cleaning
1221          * scan, the flag is cleared.  Any buffer dirtied after this point won't
1222          * have the flag set.
1223          *
1224          * Note that if we fail to write some buffer, we may leave buffers with
1225          * BM_CHECKPOINT_NEEDED still set.      This is OK since any such buffer would
1226          * certainly need to be written for the next checkpoint attempt, too.
1227          */
1228         num_to_write = 0;
1229         for (buf_id = 0; buf_id < NBuffers; buf_id++)
1230         {
1231                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1232
1233                 /*
1234                  * Header spinlock is enough to examine BM_DIRTY, see comment in
1235                  * SyncOneBuffer.
1236                  */
1237                 LockBufHdr(bufHdr);
1238
1239                 if ((bufHdr->flags & mask) == mask)
1240                 {
1241                         bufHdr->flags |= BM_CHECKPOINT_NEEDED;
1242                         num_to_write++;
1243                 }
1244
1245                 UnlockBufHdr(bufHdr);
1246         }
1247
1248         if (num_to_write == 0)
1249                 return;                                 /* nothing to do */
1250
1251         TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
1252
1253         /*
1254          * Loop over all buffers again, and write the ones (still) marked with
1255          * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
1256          * since we might as well dump soon-to-be-recycled buffers first.
1257          *
1258          * Note that we don't read the buffer alloc count here --- that should be
1259          * left untouched till the next BgBufferSync() call.
1260          */
1261         buf_id = StrategySyncStart(NULL, NULL);
1262         num_to_scan = NBuffers;
1263         num_written = 0;
1264         while (num_to_scan-- > 0)
1265         {
1266                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1267
1268                 /*
1269                  * We don't need to acquire the lock here, because we're only looking
1270                  * at a single bit. It's possible that someone else writes the buffer
1271                  * and clears the flag right after we check, but that doesn't matter
1272                  * since SyncOneBuffer will then do nothing.  However, there is a
1273                  * further race condition: it's conceivable that between the time we
1274                  * examine the bit here and the time SyncOneBuffer acquires lock,
1275                  * someone else not only wrote the buffer but replaced it with another
1276                  * page and dirtied it.  In that improbable case, SyncOneBuffer will
1277                  * write the buffer though we didn't need to.  It doesn't seem worth
1278                  * guarding against this, though.
1279                  */
1280                 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
1281                 {
1282                         if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
1283                         {
1284                                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1285                                 BgWriterStats.m_buf_written_checkpoints++;
1286                                 num_written++;
1287
1288                                 /*
1289                                  * We know there are at most num_to_write buffers with
1290                                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
1291                                  * num_written reaches num_to_write.
1292                                  *
1293                                  * Note that num_written doesn't include buffers written by
1294                                  * other backends, or by the bgwriter cleaning scan. That
1295                                  * means that the estimate of how much progress we've made is
1296                                  * conservative, and also that this test will often fail to
1297                                  * trigger.  But it seems worth making anyway.
1298                                  */
1299                                 if (num_written >= num_to_write)
1300                                         break;
1301
1302                                 /*
1303                                  * Sleep to throttle our I/O rate.
1304                                  */
1305                                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
1306                         }
1307                 }
1308
1309                 if (++buf_id >= NBuffers)
1310                         buf_id = 0;
1311         }
1312
1313         /*
1314          * Update checkpoint statistics. As noted above, this doesn't include
1315          * buffers written by other backends or bgwriter scan.
1316          */
1317         CheckpointStats.ckpt_bufs_written += num_written;
1318
1319         TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
1320 }
1321
1322 /*
1323  * BgBufferSync -- Write out some dirty buffers in the pool.
1324  *
1325  * This is called periodically by the background writer process.
1326  *
1327  * Returns true if it's appropriate for the bgwriter process to go into
1328  * low-power hibernation mode.  (This happens if the strategy clock sweep
1329  * has been "lapped" and no buffer allocations have occurred recently,
1330  * or if the bgwriter has been effectively disabled by setting
1331  * bgwriter_lru_maxpages to 0.)
1332  */
1333 bool
1334 BgBufferSync(void)
1335 {
1336         /* info obtained from freelist.c */
1337         int                     strategy_buf_id;
1338         uint32          strategy_passes;
1339         uint32          recent_alloc;
1340
1341         /*
1342          * Information saved between calls so we can determine the strategy
1343          * point's advance rate and avoid scanning already-cleaned buffers.
1344          */
1345         static bool saved_info_valid = false;
1346         static int      prev_strategy_buf_id;
1347         static uint32 prev_strategy_passes;
1348         static int      next_to_clean;
1349         static uint32 next_passes;
1350
1351         /* Moving averages of allocation rate and clean-buffer density */
1352         static float smoothed_alloc = 0;
1353         static float smoothed_density = 10.0;
1354
1355         /* Potentially these could be tunables, but for now, not */
1356         float           smoothing_samples = 16;
1357         float           scan_whole_pool_milliseconds = 120000.0;
1358
1359         /* Used to compute how far we scan ahead */
1360         long            strategy_delta;
1361         int                     bufs_to_lap;
1362         int                     bufs_ahead;
1363         float           scans_per_alloc;
1364         int                     reusable_buffers_est;
1365         int                     upcoming_alloc_est;
1366         int                     min_scan_buffers;
1367
1368         /* Variables for the scanning loop proper */
1369         int                     num_to_scan;
1370         int                     num_written;
1371         int                     reusable_buffers;
1372
1373         /* Variables for final smoothed_density update */
1374         long            new_strategy_delta;
1375         uint32          new_recent_alloc;
1376
1377         /*
1378          * Find out where the freelist clock sweep currently is, and how many
1379          * buffer allocations have happened since our last call.
1380          */
1381         strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
1382
1383         /* Report buffer alloc counts to pgstat */
1384         BgWriterStats.m_buf_alloc += recent_alloc;
1385
1386         /*
1387          * If we're not running the LRU scan, just stop after doing the stats
1388          * stuff.  We mark the saved state invalid so that we can recover sanely
1389          * if LRU scan is turned back on later.
1390          */
1391         if (bgwriter_lru_maxpages <= 0)
1392         {
1393                 saved_info_valid = false;
1394                 return true;
1395         }
1396
1397         /*
1398          * Compute strategy_delta = how many buffers have been scanned by the
1399          * clock sweep since last time.  If first time through, assume none. Then
1400          * see if we are still ahead of the clock sweep, and if so, how many
1401          * buffers we could scan before we'd catch up with it and "lap" it. Note:
1402          * weird-looking coding of xxx_passes comparisons are to avoid bogus
1403          * behavior when the passes counts wrap around.
1404          */
1405         if (saved_info_valid)
1406         {
1407                 int32           passes_delta = strategy_passes - prev_strategy_passes;
1408
1409                 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
1410                 strategy_delta += (long) passes_delta *NBuffers;
1411
1412                 Assert(strategy_delta >= 0);
1413
1414                 if ((int32) (next_passes - strategy_passes) > 0)
1415                 {
1416                         /* we're one pass ahead of the strategy point */
1417                         bufs_to_lap = strategy_buf_id - next_to_clean;
1418 #ifdef BGW_DEBUG
1419                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1420                                  next_passes, next_to_clean,
1421                                  strategy_passes, strategy_buf_id,
1422                                  strategy_delta, bufs_to_lap);
1423 #endif
1424                 }
1425                 else if (next_passes == strategy_passes &&
1426                                  next_to_clean >= strategy_buf_id)
1427                 {
1428                         /* on same pass, but ahead or at least not behind */
1429                         bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
1430 #ifdef BGW_DEBUG
1431                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1432                                  next_passes, next_to_clean,
1433                                  strategy_passes, strategy_buf_id,
1434                                  strategy_delta, bufs_to_lap);
1435 #endif
1436                 }
1437                 else
1438                 {
1439                         /*
1440                          * We're behind, so skip forward to the strategy point and start
1441                          * cleaning from there.
1442                          */
1443 #ifdef BGW_DEBUG
1444                         elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
1445                                  next_passes, next_to_clean,
1446                                  strategy_passes, strategy_buf_id,
1447                                  strategy_delta);
1448 #endif
1449                         next_to_clean = strategy_buf_id;
1450                         next_passes = strategy_passes;
1451                         bufs_to_lap = NBuffers;
1452                 }
1453         }
1454         else
1455         {
1456                 /*
1457                  * Initializing at startup or after LRU scanning had been off. Always
1458                  * start at the strategy point.
1459                  */
1460 #ifdef BGW_DEBUG
1461                 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
1462                          strategy_passes, strategy_buf_id);
1463 #endif
1464                 strategy_delta = 0;
1465                 next_to_clean = strategy_buf_id;
1466                 next_passes = strategy_passes;
1467                 bufs_to_lap = NBuffers;
1468         }
1469
1470         /* Update saved info for next time */
1471         prev_strategy_buf_id = strategy_buf_id;
1472         prev_strategy_passes = strategy_passes;
1473         saved_info_valid = true;
1474
1475         /*
1476          * Compute how many buffers had to be scanned for each new allocation, ie,
1477          * 1/density of reusable buffers, and track a moving average of that.
1478          *
1479          * If the strategy point didn't move, we don't update the density estimate
1480          */
1481         if (strategy_delta > 0 && recent_alloc > 0)
1482         {
1483                 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
1484                 smoothed_density += (scans_per_alloc - smoothed_density) /
1485                         smoothing_samples;
1486         }
1487
1488         /*
1489          * Estimate how many reusable buffers there are between the current
1490          * strategy point and where we've scanned ahead to, based on the smoothed
1491          * density estimate.
1492          */
1493         bufs_ahead = NBuffers - bufs_to_lap;
1494         reusable_buffers_est = (float) bufs_ahead / smoothed_density;
1495
1496         /*
1497          * Track a moving average of recent buffer allocations.  Here, rather than
1498          * a true average we want a fast-attack, slow-decline behavior: we
1499          * immediately follow any increase.
1500          */
1501         if (smoothed_alloc <= (float) recent_alloc)
1502                 smoothed_alloc = recent_alloc;
1503         else
1504                 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
1505                         smoothing_samples;
1506
1507         /* Scale the estimate by a GUC to allow more aggressive tuning. */
1508         upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
1509
1510         /*
1511          * If recent_alloc remains at zero for many cycles, smoothed_alloc will
1512          * eventually underflow to zero, and the underflows produce annoying
1513          * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
1514          * zero, there's no point in tracking smaller and smaller values of
1515          * smoothed_alloc, so just reset it to exactly zero to avoid this
1516          * syndrome.  It will pop back up as soon as recent_alloc increases.
1517          */
1518         if (upcoming_alloc_est == 0)
1519                 smoothed_alloc = 0;
1520
1521         /*
1522          * Even in cases where there's been little or no buffer allocation
1523          * activity, we want to make a small amount of progress through the buffer
1524          * cache so that as many reusable buffers as possible are clean after an
1525          * idle period.
1526          *
1527          * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
1528          * the BGW will be called during the scan_whole_pool time; slice the
1529          * buffer pool into that many sections.
1530          */
1531         min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
1532
1533         if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
1534         {
1535 #ifdef BGW_DEBUG
1536                 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
1537                          upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
1538 #endif
1539                 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
1540         }
1541
1542         /*
1543          * Now write out dirty reusable buffers, working forward from the
1544          * next_to_clean point, until we have lapped the strategy scan, or cleaned
1545          * enough buffers to match our estimate of the next cycle's allocation
1546          * requirements, or hit the bgwriter_lru_maxpages limit.
1547          */
1548
1549         /* Make sure we can handle the pin inside SyncOneBuffer */
1550         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1551
1552         num_to_scan = bufs_to_lap;
1553         num_written = 0;
1554         reusable_buffers = reusable_buffers_est;
1555
1556         /* Execute the LRU scan */
1557         while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
1558         {
1559                 int                     buffer_state = SyncOneBuffer(next_to_clean, true);
1560
1561                 if (++next_to_clean >= NBuffers)
1562                 {
1563                         next_to_clean = 0;
1564                         next_passes++;
1565                 }
1566                 num_to_scan--;
1567
1568                 if (buffer_state & BUF_WRITTEN)
1569                 {
1570                         reusable_buffers++;
1571                         if (++num_written >= bgwriter_lru_maxpages)
1572                         {
1573                                 BgWriterStats.m_maxwritten_clean++;
1574                                 break;
1575                         }
1576                 }
1577                 else if (buffer_state & BUF_REUSABLE)
1578                         reusable_buffers++;
1579         }
1580
1581         BgWriterStats.m_buf_written_clean += num_written;
1582
1583 #ifdef BGW_DEBUG
1584         elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
1585                  recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
1586                  smoothed_density, reusable_buffers_est, upcoming_alloc_est,
1587                  bufs_to_lap - num_to_scan,
1588                  num_written,
1589                  reusable_buffers - reusable_buffers_est);
1590 #endif
1591
1592         /*
1593          * Consider the above scan as being like a new allocation scan.
1594          * Characterize its density and update the smoothed one based on it. This
1595          * effectively halves the moving average period in cases where both the
1596          * strategy and the background writer are doing some useful scanning,
1597          * which is helpful because a long memory isn't as desirable on the
1598          * density estimates.
1599          */
1600         new_strategy_delta = bufs_to_lap - num_to_scan;
1601         new_recent_alloc = reusable_buffers - reusable_buffers_est;
1602         if (new_strategy_delta > 0 && new_recent_alloc > 0)
1603         {
1604                 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
1605                 smoothed_density += (scans_per_alloc - smoothed_density) /
1606                         smoothing_samples;
1607
1608 #ifdef BGW_DEBUG
1609                 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
1610                          new_recent_alloc, new_strategy_delta,
1611                          scans_per_alloc, smoothed_density);
1612 #endif
1613         }
1614
1615         /* Return true if OK to hibernate */
1616         return (bufs_to_lap == 0 && recent_alloc == 0);
1617 }
1618
1619 /*
1620  * SyncOneBuffer -- process a single buffer during syncing.
1621  *
1622  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
1623  * buffers marked recently used, as these are not replacement candidates.
1624  *
1625  * Returns a bitmask containing the following flag bits:
1626  *      BUF_WRITTEN: we wrote the buffer.
1627  *      BUF_REUSABLE: buffer is available for replacement, ie, it has
1628  *              pin count 0 and usage count 0.
1629  *
1630  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
1631  * after locking it, but we don't care all that much.)
1632  *
1633  * Note: caller must have done ResourceOwnerEnlargeBuffers.
1634  */
1635 static int
1636 SyncOneBuffer(int buf_id, bool skip_recently_used)
1637 {
1638         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1639         int                     result = 0;
1640
1641         /*
1642          * Check whether buffer needs writing.
1643          *
1644          * We can make this check without taking the buffer content lock so long
1645          * as we mark pages dirty in access methods *before* logging changes with
1646          * XLogInsert(): if someone marks the buffer dirty just after our check we
1647          * don't worry because our checkpoint.redo points before log record for
1648          * upcoming changes and so we are not required to write such dirty buffer.
1649          */
1650         LockBufHdr(bufHdr);
1651
1652         if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
1653                 result |= BUF_REUSABLE;
1654         else if (skip_recently_used)
1655         {
1656                 /* Caller told us not to write recently-used buffers */
1657                 UnlockBufHdr(bufHdr);
1658                 return result;
1659         }
1660
1661         if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
1662         {
1663                 /* It's clean, so nothing to do */
1664                 UnlockBufHdr(bufHdr);
1665                 return result;
1666         }
1667
1668         /*
1669          * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
1670          * buffer is clean by the time we've locked it.)
1671          */
1672         PinBuffer_Locked(bufHdr);
1673         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
1674
1675         FlushBuffer(bufHdr, NULL);
1676
1677         LWLockRelease(bufHdr->content_lock);
1678         UnpinBuffer(bufHdr, true);
1679
1680         return result | BUF_WRITTEN;
1681 }
1682
1683
1684 /*
1685  *              AtEOXact_Buffers - clean up at end of transaction.
1686  *
1687  *              As of PostgreSQL 8.0, buffer pins should get released by the
1688  *              ResourceOwner mechanism.  This routine is just a debugging
1689  *              cross-check that no pins remain.
1690  */
1691 void
1692 AtEOXact_Buffers(bool isCommit)
1693 {
1694 #ifdef USE_ASSERT_CHECKING
1695         if (assert_enabled)
1696         {
1697                 int                     i;
1698
1699                 for (i = 0; i < NBuffers; i++)
1700                 {
1701                         Assert(PrivateRefCount[i] == 0);
1702                 }
1703         }
1704 #endif
1705
1706         AtEOXact_LocalBuffers(isCommit);
1707 }
1708
1709 /*
1710  * InitBufferPoolBackend --- second-stage initialization of a new backend
1711  *
1712  * This is called after we have acquired a PGPROC and so can safely get
1713  * LWLocks.  We don't currently need to do anything at this stage ...
1714  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
1715  * access, and thereby has to be called at the corresponding phase of
1716  * backend shutdown.
1717  */
1718 void
1719 InitBufferPoolBackend(void)
1720 {
1721         on_shmem_exit(AtProcExit_Buffers, 0);
1722 }
1723
1724 /*
1725  * During backend exit, ensure that we released all shared-buffer locks and
1726  * assert that we have no remaining pins.
1727  */
1728 static void
1729 AtProcExit_Buffers(int code, Datum arg)
1730 {
1731         AbortBufferIO();
1732         UnlockBuffers();
1733
1734 #ifdef USE_ASSERT_CHECKING
1735         if (assert_enabled)
1736         {
1737                 int                     i;
1738
1739                 for (i = 0; i < NBuffers; i++)
1740                 {
1741                         Assert(PrivateRefCount[i] == 0);
1742                 }
1743         }
1744 #endif
1745
1746         /* localbuf.c needs a chance too */
1747         AtProcExit_LocalBuffers();
1748 }
1749
1750 /*
1751  * Helper routine to issue warnings when a buffer is unexpectedly pinned
1752  */
1753 void
1754 PrintBufferLeakWarning(Buffer buffer)
1755 {
1756         volatile BufferDesc *buf;
1757         int32           loccount;
1758         char       *path;
1759         BackendId       backend;
1760
1761         Assert(BufferIsValid(buffer));
1762         if (BufferIsLocal(buffer))
1763         {
1764                 buf = &LocalBufferDescriptors[-buffer - 1];
1765                 loccount = LocalRefCount[-buffer - 1];
1766                 backend = MyBackendId;
1767         }
1768         else
1769         {
1770                 buf = &BufferDescriptors[buffer - 1];
1771                 loccount = PrivateRefCount[buffer - 1];
1772                 backend = InvalidBackendId;
1773         }
1774
1775         /* theoretically we should lock the bufhdr here */
1776         path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
1777         elog(WARNING,
1778                  "buffer refcount leak: [%03d] "
1779                  "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
1780                  buffer, path,
1781                  buf->tag.blockNum, buf->flags,
1782                  buf->refcount, loccount);
1783         pfree(path);
1784 }
1785
1786 /*
1787  * CheckPointBuffers
1788  *
1789  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
1790  *
1791  * Note: temporary relations do not participate in checkpoints, so they don't
1792  * need to be flushed.
1793  */
1794 void
1795 CheckPointBuffers(int flags)
1796 {
1797         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
1798         CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
1799         BufferSync(flags);
1800         CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
1801         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
1802         smgrsync();
1803         CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
1804         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
1805 }
1806
1807
1808 /*
1809  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
1810  */
1811 void
1812 BufmgrCommit(void)
1813 {
1814         /* Nothing to do in bufmgr anymore... */
1815 }
1816
1817 /*
1818  * BufferGetBlockNumber
1819  *              Returns the block number associated with a buffer.
1820  *
1821  * Note:
1822  *              Assumes that the buffer is valid and pinned, else the
1823  *              value may be obsolete immediately...
1824  */
1825 BlockNumber
1826 BufferGetBlockNumber(Buffer buffer)
1827 {
1828         volatile BufferDesc *bufHdr;
1829
1830         Assert(BufferIsPinned(buffer));
1831
1832         if (BufferIsLocal(buffer))
1833                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1834         else
1835                 bufHdr = &BufferDescriptors[buffer - 1];
1836
1837         /* pinned, so OK to read tag without spinlock */
1838         return bufHdr->tag.blockNum;
1839 }
1840
1841 /*
1842  * BufferGetTag
1843  *              Returns the relfilenode, fork number and block number associated with
1844  *              a buffer.
1845  */
1846 void
1847 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
1848                          BlockNumber *blknum)
1849 {
1850         volatile BufferDesc *bufHdr;
1851
1852         /* Do the same checks as BufferGetBlockNumber. */
1853         Assert(BufferIsPinned(buffer));
1854
1855         if (BufferIsLocal(buffer))
1856                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1857         else
1858                 bufHdr = &BufferDescriptors[buffer - 1];
1859
1860         /* pinned, so OK to read tag without spinlock */
1861         *rnode = bufHdr->tag.rnode;
1862         *forknum = bufHdr->tag.forkNum;
1863         *blknum = bufHdr->tag.blockNum;
1864 }
1865
1866 /*
1867  * FlushBuffer
1868  *              Physically write out a shared buffer.
1869  *
1870  * NOTE: this actually just passes the buffer contents to the kernel; the
1871  * real write to disk won't happen until the kernel feels like it.  This
1872  * is okay from our point of view since we can redo the changes from WAL.
1873  * However, we will need to force the changes to disk via fsync before
1874  * we can checkpoint WAL.
1875  *
1876  * The caller must hold a pin on the buffer and have share-locked the
1877  * buffer contents.  (Note: a share-lock does not prevent updates of
1878  * hint bits in the buffer, so the page could change while the write
1879  * is in progress, but we assume that that will not invalidate the data
1880  * written.)
1881  *
1882  * If the caller has an smgr reference for the buffer's relation, pass it
1883  * as the second parameter.  If not, pass NULL.  In the latter case, the
1884  * relation will be marked as "transient" so that the corresponding
1885  * kernel-level file descriptors are closed when the current transaction ends,
1886  * if any.
1887  */
1888 static void
1889 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1890 {
1891         XLogRecPtr      recptr;
1892         ErrorContextCallback errcontext;
1893         instr_time      io_start,
1894                                 io_time;
1895
1896         /*
1897          * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
1898          * false, then someone else flushed the buffer before we could, so we need
1899          * not do anything.
1900          */
1901         if (!StartBufferIO(buf, false))
1902                 return;
1903
1904         /* Setup error traceback support for ereport() */
1905         errcontext.callback = shared_buffer_write_error_callback;
1906         errcontext.arg = (void *) buf;
1907         errcontext.previous = error_context_stack;
1908         error_context_stack = &errcontext;
1909
1910         /* Find smgr relation for buffer, and mark it as transient */
1911         if (reln == NULL)
1912         {
1913                 reln = smgropen(buf->tag.rnode, InvalidBackendId);
1914                 smgrsettransient(reln);
1915         }
1916
1917         TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
1918                                                                                 buf->tag.blockNum,
1919                                                                                 reln->smgr_rnode.node.spcNode,
1920                                                                                 reln->smgr_rnode.node.dbNode,
1921                                                                                 reln->smgr_rnode.node.relNode);
1922
1923         /*
1924          * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
1925          * rule that log updates must hit disk before any of the data-file changes
1926          * they describe do.
1927          */
1928         recptr = BufferGetLSN(buf);
1929         XLogFlush(recptr);
1930
1931         /*
1932          * Now it's safe to write buffer to disk. Note that no one else should
1933          * have been able to write it while we were busy with log flushing because
1934          * we have the io_in_progress lock.
1935          */
1936
1937         /* To check if block content changes while flushing. - vadim 01/17/97 */
1938         LockBufHdr(buf);
1939         buf->flags &= ~BM_JUST_DIRTIED;
1940         UnlockBufHdr(buf);
1941
1942         if (track_io_timing)
1943                 INSTR_TIME_SET_CURRENT(io_start);
1944
1945         smgrwrite(reln,
1946                           buf->tag.forkNum,
1947                           buf->tag.blockNum,
1948                           (char *) BufHdrGetBlock(buf),
1949                           false);
1950
1951         if (track_io_timing)
1952         {
1953                 INSTR_TIME_SET_CURRENT(io_time);
1954                 INSTR_TIME_SUBTRACT(io_time, io_start);
1955                 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
1956                 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
1957         }
1958
1959         pgBufferUsage.shared_blks_written++;
1960
1961         /*
1962          * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
1963          * end the io_in_progress state.
1964          */
1965         TerminateBufferIO(buf, true, 0);
1966
1967         TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
1968                                                                            buf->tag.blockNum,
1969                                                                            reln->smgr_rnode.node.spcNode,
1970                                                                            reln->smgr_rnode.node.dbNode,
1971                                                                            reln->smgr_rnode.node.relNode);
1972
1973         /* Pop the error context stack */
1974         error_context_stack = errcontext.previous;
1975 }
1976
1977 /*
1978  * RelationGetNumberOfBlocks
1979  *              Determines the current number of pages in the relation.
1980  */
1981 BlockNumber
1982 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
1983 {
1984         /* Open it at the smgr level if not already done */
1985         RelationOpenSmgr(relation);
1986
1987         return smgrnblocks(relation->rd_smgr, forkNum);
1988 }
1989
1990 /*
1991  * BufferIsPermanent
1992  *              Determines whether a buffer will potentially still be around after
1993  *              a crash.  Caller must hold a buffer pin.
1994  */
1995 bool
1996 BufferIsPermanent(Buffer buffer)
1997 {
1998         volatile BufferDesc *bufHdr;
1999
2000         /* Local buffers are used only for temp relations. */
2001         if (BufferIsLocal(buffer))
2002                 return false;
2003
2004         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2005         Assert(BufferIsValid(buffer));
2006         Assert(BufferIsPinned(buffer));
2007
2008         /*
2009          * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2010          * need not bother with the buffer header spinlock.  Even if someone else
2011          * changes the buffer header flags while we're doing this, we assume that
2012          * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
2013          * old value or the new value, but not random garbage.
2014          */
2015         bufHdr = &BufferDescriptors[buffer - 1];
2016         return (bufHdr->flags & BM_PERMANENT) != 0;
2017 }
2018
2019 /* ---------------------------------------------------------------------
2020  *              DropRelFileNodeBuffers
2021  *
2022  *              This function removes from the buffer pool all the pages of the
2023  *              specified relation fork that have block numbers >= firstDelBlock.
2024  *              (In particular, with firstDelBlock = 0, all pages are removed.)
2025  *              Dirty pages are simply dropped, without bothering to write them
2026  *              out first.      Therefore, this is NOT rollback-able, and so should be
2027  *              used only with extreme caution!
2028  *
2029  *              Currently, this is called only from smgr.c when the underlying file
2030  *              is about to be deleted or truncated (firstDelBlock is needed for
2031  *              the truncation case).  The data in the affected pages would therefore
2032  *              be deleted momentarily anyway, and there is no point in writing it.
2033  *              It is the responsibility of higher-level code to ensure that the
2034  *              deletion or truncation does not lose any data that could be needed
2035  *              later.  It is also the responsibility of higher-level code to ensure
2036  *              that no other process could be trying to load more pages of the
2037  *              relation into buffers.
2038  *
2039  *              XXX currently it sequentially searches the buffer pool, should be
2040  *              changed to more clever ways of searching.  However, this routine
2041  *              is used only in code paths that aren't very performance-critical,
2042  *              and we shouldn't slow down the hot paths to make it faster ...
2043  * --------------------------------------------------------------------
2044  */
2045 void
2046 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2047                                            BlockNumber firstDelBlock)
2048 {
2049         int                     i;
2050
2051         /* If it's a local relation, it's localbuf.c's problem. */
2052         if (RelFileNodeBackendIsTemp(rnode))
2053         {
2054                 if (rnode.backend == MyBackendId)
2055                         DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2056                 return;
2057         }
2058
2059         for (i = 0; i < NBuffers; i++)
2060         {
2061                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2062
2063                 /*
2064                  * We can make this a tad faster by prechecking the buffer tag before
2065                  * we attempt to lock the buffer; this saves a lot of lock
2066                  * acquisitions in typical cases.  It should be safe because the
2067                  * caller must have AccessExclusiveLock on the relation, or some other
2068                  * reason to be certain that no one is loading new pages of the rel
2069                  * into the buffer pool.  (Otherwise we might well miss such pages
2070                  * entirely.)  Therefore, while the tag might be changing while we
2071                  * look at it, it can't be changing *to* a value we care about, only
2072                  * *away* from such a value.  So false negatives are impossible, and
2073                  * false positives are safe because we'll recheck after getting the
2074                  * buffer lock.
2075                  *
2076                  * We could check forkNum and blockNum as well as the rnode, but the
2077                  * incremental win from doing so seems small.
2078                  */
2079                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2080                         continue;
2081
2082                 LockBufHdr(bufHdr);
2083                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2084                         bufHdr->tag.forkNum == forkNum &&
2085                         bufHdr->tag.blockNum >= firstDelBlock)
2086                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2087                 else
2088                         UnlockBufHdr(bufHdr);
2089         }
2090 }
2091
2092 /* ---------------------------------------------------------------------
2093  *              DropRelFileNodeAllBuffers
2094  *
2095  *              This function removes from the buffer pool all the pages of all
2096  *              forks of the specified relation.  It's equivalent to calling
2097  *              DropRelFileNodeBuffers once per fork with firstDelBlock = 0.
2098  * --------------------------------------------------------------------
2099  */
2100 void
2101 DropRelFileNodeAllBuffers(RelFileNodeBackend rnode)
2102 {
2103         int                     i;
2104
2105         /* If it's a local relation, it's localbuf.c's problem. */
2106         if (RelFileNodeBackendIsTemp(rnode))
2107         {
2108                 if (rnode.backend == MyBackendId)
2109                         DropRelFileNodeAllLocalBuffers(rnode.node);
2110                 return;
2111         }
2112
2113         for (i = 0; i < NBuffers; i++)
2114         {
2115                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2116
2117                 /*
2118                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2119                  * and saves some cycles.
2120                  */
2121                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2122                         continue;
2123
2124                 LockBufHdr(bufHdr);
2125                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2126                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2127                 else
2128                         UnlockBufHdr(bufHdr);
2129         }
2130 }
2131
2132 /* ---------------------------------------------------------------------
2133  *              DropDatabaseBuffers
2134  *
2135  *              This function removes all the buffers in the buffer cache for a
2136  *              particular database.  Dirty pages are simply dropped, without
2137  *              bothering to write them out first.      This is used when we destroy a
2138  *              database, to avoid trying to flush data to disk when the directory
2139  *              tree no longer exists.  Implementation is pretty similar to
2140  *              DropRelFileNodeBuffers() which is for destroying just one relation.
2141  * --------------------------------------------------------------------
2142  */
2143 void
2144 DropDatabaseBuffers(Oid dbid)
2145 {
2146         int                     i;
2147
2148         /*
2149          * We needn't consider local buffers, since by assumption the target
2150          * database isn't our own.
2151          */
2152
2153         for (i = 0; i < NBuffers; i++)
2154         {
2155                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2156
2157                 /*
2158                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2159                  * and saves some cycles.
2160                  */
2161                 if (bufHdr->tag.rnode.dbNode != dbid)
2162                         continue;
2163
2164                 LockBufHdr(bufHdr);
2165                 if (bufHdr->tag.rnode.dbNode == dbid)
2166                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2167                 else
2168                         UnlockBufHdr(bufHdr);
2169         }
2170 }
2171
2172 /* -----------------------------------------------------------------
2173  *              PrintBufferDescs
2174  *
2175  *              this function prints all the buffer descriptors, for debugging
2176  *              use only.
2177  * -----------------------------------------------------------------
2178  */
2179 #ifdef NOT_USED
2180 void
2181 PrintBufferDescs(void)
2182 {
2183         int                     i;
2184         volatile BufferDesc *buf = BufferDescriptors;
2185
2186         for (i = 0; i < NBuffers; ++i, ++buf)
2187         {
2188                 /* theoretically we should lock the bufhdr here */
2189                 elog(LOG,
2190                          "[%02d] (freeNext=%d, rel=%s, "
2191                          "blockNum=%u, flags=0x%x, refcount=%u %d)",
2192                          i, buf->freeNext,
2193                   relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
2194                          buf->tag.blockNum, buf->flags,
2195                          buf->refcount, PrivateRefCount[i]);
2196         }
2197 }
2198 #endif
2199
2200 #ifdef NOT_USED
2201 void
2202 PrintPinnedBufs(void)
2203 {
2204         int                     i;
2205         volatile BufferDesc *buf = BufferDescriptors;
2206
2207         for (i = 0; i < NBuffers; ++i, ++buf)
2208         {
2209                 if (PrivateRefCount[i] > 0)
2210                 {
2211                         /* theoretically we should lock the bufhdr here */
2212                         elog(LOG,
2213                                  "[%02d] (freeNext=%d, rel=%s, "
2214                                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
2215                                  i, buf->freeNext,
2216                                  relpath(buf->tag.rnode, buf->tag.forkNum),
2217                                  buf->tag.blockNum, buf->flags,
2218                                  buf->refcount, PrivateRefCount[i]);
2219                 }
2220         }
2221 }
2222 #endif
2223
2224 /* ---------------------------------------------------------------------
2225  *              FlushRelationBuffers
2226  *
2227  *              This function writes all dirty pages of a relation out to disk
2228  *              (or more accurately, out to kernel disk buffers), ensuring that the
2229  *              kernel has an up-to-date view of the relation.
2230  *
2231  *              Generally, the caller should be holding AccessExclusiveLock on the
2232  *              target relation to ensure that no other backend is busy dirtying
2233  *              more blocks of the relation; the effects can't be expected to last
2234  *              after the lock is released.
2235  *
2236  *              XXX currently it sequentially searches the buffer pool, should be
2237  *              changed to more clever ways of searching.  This routine is not
2238  *              used in any performance-critical code paths, so it's not worth
2239  *              adding additional overhead to normal paths to make it go faster;
2240  *              but see also DropRelFileNodeBuffers.
2241  * --------------------------------------------------------------------
2242  */
2243 void
2244 FlushRelationBuffers(Relation rel)
2245 {
2246         int                     i;
2247         volatile BufferDesc *bufHdr;
2248
2249         /* Open rel at the smgr level if not already done */
2250         RelationOpenSmgr(rel);
2251
2252         if (RelationUsesLocalBuffers(rel))
2253         {
2254                 for (i = 0; i < NLocBuffer; i++)
2255                 {
2256                         bufHdr = &LocalBufferDescriptors[i];
2257                         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2258                                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2259                         {
2260                                 ErrorContextCallback errcontext;
2261
2262                                 /* Setup error traceback support for ereport() */
2263                                 errcontext.callback = local_buffer_write_error_callback;
2264                                 errcontext.arg = (void *) bufHdr;
2265                                 errcontext.previous = error_context_stack;
2266                                 error_context_stack = &errcontext;
2267
2268                                 smgrwrite(rel->rd_smgr,
2269                                                   bufHdr->tag.forkNum,
2270                                                   bufHdr->tag.blockNum,
2271                                                   (char *) LocalBufHdrGetBlock(bufHdr),
2272                                                   false);
2273
2274                                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
2275
2276                                 /* Pop the error context stack */
2277                                 error_context_stack = errcontext.previous;
2278                         }
2279                 }
2280
2281                 return;
2282         }
2283
2284         /* Make sure we can handle the pin inside the loop */
2285         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2286
2287         for (i = 0; i < NBuffers; i++)
2288         {
2289                 bufHdr = &BufferDescriptors[i];
2290
2291                 /*
2292                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2293                  * and saves some cycles.
2294                  */
2295                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
2296                         continue;
2297
2298                 LockBufHdr(bufHdr);
2299                 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2300                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2301                 {
2302                         PinBuffer_Locked(bufHdr);
2303                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2304                         FlushBuffer(bufHdr, rel->rd_smgr);
2305                         LWLockRelease(bufHdr->content_lock);
2306                         UnpinBuffer(bufHdr, true);
2307                 }
2308                 else
2309                         UnlockBufHdr(bufHdr);
2310         }
2311 }
2312
2313 /* ---------------------------------------------------------------------
2314  *              FlushDatabaseBuffers
2315  *
2316  *              This function writes all dirty pages of a database out to disk
2317  *              (or more accurately, out to kernel disk buffers), ensuring that the
2318  *              kernel has an up-to-date view of the database.
2319  *
2320  *              Generally, the caller should be holding an appropriate lock to ensure
2321  *              no other backend is active in the target database; otherwise more
2322  *              pages could get dirtied.
2323  *
2324  *              Note we don't worry about flushing any pages of temporary relations.
2325  *              It's assumed these wouldn't be interesting.
2326  * --------------------------------------------------------------------
2327  */
2328 void
2329 FlushDatabaseBuffers(Oid dbid)
2330 {
2331         int                     i;
2332         volatile BufferDesc *bufHdr;
2333
2334         /* Make sure we can handle the pin inside the loop */
2335         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2336
2337         for (i = 0; i < NBuffers; i++)
2338         {
2339                 bufHdr = &BufferDescriptors[i];
2340
2341                 /*
2342                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2343                  * and saves some cycles.
2344                  */
2345                 if (bufHdr->tag.rnode.dbNode != dbid)
2346                         continue;
2347
2348                 LockBufHdr(bufHdr);
2349                 if (bufHdr->tag.rnode.dbNode == dbid &&
2350                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2351                 {
2352                         PinBuffer_Locked(bufHdr);
2353                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2354                         FlushBuffer(bufHdr, NULL);
2355                         LWLockRelease(bufHdr->content_lock);
2356                         UnpinBuffer(bufHdr, true);
2357                 }
2358                 else
2359                         UnlockBufHdr(bufHdr);
2360         }
2361 }
2362
2363 /*
2364  * ReleaseBuffer -- release the pin on a buffer
2365  */
2366 void
2367 ReleaseBuffer(Buffer buffer)
2368 {
2369         volatile BufferDesc *bufHdr;
2370
2371         if (!BufferIsValid(buffer))
2372                 elog(ERROR, "bad buffer ID: %d", buffer);
2373
2374         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
2375
2376         if (BufferIsLocal(buffer))
2377         {
2378                 Assert(LocalRefCount[-buffer - 1] > 0);
2379                 LocalRefCount[-buffer - 1]--;
2380                 return;
2381         }
2382
2383         bufHdr = &BufferDescriptors[buffer - 1];
2384
2385         Assert(PrivateRefCount[buffer - 1] > 0);
2386
2387         if (PrivateRefCount[buffer - 1] > 1)
2388                 PrivateRefCount[buffer - 1]--;
2389         else
2390                 UnpinBuffer(bufHdr, false);
2391 }
2392
2393 /*
2394  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
2395  *
2396  * This is just a shorthand for a common combination.
2397  */
2398 void
2399 UnlockReleaseBuffer(Buffer buffer)
2400 {
2401         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2402         ReleaseBuffer(buffer);
2403 }
2404
2405 /*
2406  * IncrBufferRefCount
2407  *              Increment the pin count on a buffer that we have *already* pinned
2408  *              at least once.
2409  *
2410  *              This function cannot be used on a buffer we do not have pinned,
2411  *              because it doesn't change the shared buffer state.
2412  */
2413 void
2414 IncrBufferRefCount(Buffer buffer)
2415 {
2416         Assert(BufferIsPinned(buffer));
2417         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2418         ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
2419         if (BufferIsLocal(buffer))
2420                 LocalRefCount[-buffer - 1]++;
2421         else
2422                 PrivateRefCount[buffer - 1]++;
2423 }
2424
2425 /*
2426  * SetBufferCommitInfoNeedsSave
2427  *
2428  *      Mark a buffer dirty when we have updated tuple commit-status bits in it.
2429  *
2430  * This is essentially the same as MarkBufferDirty, except that the caller
2431  * might have only share-lock instead of exclusive-lock on the buffer's
2432  * content lock.  We preserve the distinction mainly as a way of documenting
2433  * that the caller has not made a critical data change --- the status-bit
2434  * update could be redone by someone else just as easily.  Therefore, no WAL
2435  * log record need be generated, whereas calls to MarkBufferDirty really ought
2436  * to be associated with a WAL-entry-creating action.
2437  */
2438 void
2439 SetBufferCommitInfoNeedsSave(Buffer buffer)
2440 {
2441         volatile BufferDesc *bufHdr;
2442
2443         if (!BufferIsValid(buffer))
2444                 elog(ERROR, "bad buffer ID: %d", buffer);
2445
2446         if (BufferIsLocal(buffer))
2447         {
2448                 MarkLocalBufferDirty(buffer);
2449                 return;
2450         }
2451
2452         bufHdr = &BufferDescriptors[buffer - 1];
2453
2454         Assert(PrivateRefCount[buffer - 1] > 0);
2455         /* here, either share or exclusive lock is OK */
2456         Assert(LWLockHeldByMe(bufHdr->content_lock));
2457
2458         /*
2459          * This routine might get called many times on the same page, if we are
2460          * making the first scan after commit of an xact that added/deleted many
2461          * tuples.      So, be as quick as we can if the buffer is already dirty.  We
2462          * do this by not acquiring spinlock if it looks like the status bits are
2463          * already.  Since we make this test unlocked, there's a chance we might
2464          * fail to notice that the flags have just been cleared, and failed to
2465          * reset them, due to memory-ordering issues.  But since this function is
2466          * only intended to be used in cases where failing to write out the data
2467          * would be harmless anyway, it doesn't really matter.
2468          */
2469         if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
2470                 (BM_DIRTY | BM_JUST_DIRTIED))
2471         {
2472                 LockBufHdr(bufHdr);
2473                 Assert(bufHdr->refcount > 0);
2474                 if (!(bufHdr->flags & BM_DIRTY))
2475                 {
2476                         /* Do vacuum cost accounting */
2477                         VacuumPageDirty++;
2478                         if (VacuumCostActive)
2479                                 VacuumCostBalance += VacuumCostPageDirty;
2480                 }
2481                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
2482                 UnlockBufHdr(bufHdr);
2483         }
2484 }
2485
2486 /*
2487  * Release buffer content locks for shared buffers.
2488  *
2489  * Used to clean up after errors.
2490  *
2491  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
2492  * of releasing buffer content locks per se; the only thing we need to deal
2493  * with here is clearing any PIN_COUNT request that was in progress.
2494  */
2495 void
2496 UnlockBuffers(void)
2497 {
2498         volatile BufferDesc *buf = PinCountWaitBuf;
2499
2500         if (buf)
2501         {
2502                 LockBufHdr(buf);
2503
2504                 /*
2505                  * Don't complain if flag bit not set; it could have been reset but we
2506                  * got a cancel/die interrupt before getting the signal.
2507                  */
2508                 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
2509                         buf->wait_backend_pid == MyProcPid)
2510                         buf->flags &= ~BM_PIN_COUNT_WAITER;
2511
2512                 UnlockBufHdr(buf);
2513
2514                 PinCountWaitBuf = NULL;
2515         }
2516 }
2517
2518 /*
2519  * Acquire or release the content_lock for the buffer.
2520  */
2521 void
2522 LockBuffer(Buffer buffer, int mode)
2523 {
2524         volatile BufferDesc *buf;
2525
2526         Assert(BufferIsValid(buffer));
2527         if (BufferIsLocal(buffer))
2528                 return;                                 /* local buffers need no lock */
2529
2530         buf = &(BufferDescriptors[buffer - 1]);
2531
2532         if (mode == BUFFER_LOCK_UNLOCK)
2533                 LWLockRelease(buf->content_lock);
2534         else if (mode == BUFFER_LOCK_SHARE)
2535                 LWLockAcquire(buf->content_lock, LW_SHARED);
2536         else if (mode == BUFFER_LOCK_EXCLUSIVE)
2537                 LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
2538         else
2539                 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
2540 }
2541
2542 /*
2543  * Acquire the content_lock for the buffer, but only if we don't have to wait.
2544  *
2545  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
2546  */
2547 bool
2548 ConditionalLockBuffer(Buffer buffer)
2549 {
2550         volatile BufferDesc *buf;
2551
2552         Assert(BufferIsValid(buffer));
2553         if (BufferIsLocal(buffer))
2554                 return true;                    /* act as though we got it */
2555
2556         buf = &(BufferDescriptors[buffer - 1]);
2557
2558         return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
2559 }
2560
2561 /*
2562  * LockBufferForCleanup - lock a buffer in preparation for deleting items
2563  *
2564  * Items may be deleted from a disk page only when the caller (a) holds an
2565  * exclusive lock on the buffer and (b) has observed that no other backend
2566  * holds a pin on the buffer.  If there is a pin, then the other backend
2567  * might have a pointer into the buffer (for example, a heapscan reference
2568  * to an item --- see README for more details).  It's OK if a pin is added
2569  * after the cleanup starts, however; the newly-arrived backend will be
2570  * unable to look at the page until we release the exclusive lock.
2571  *
2572  * To implement this protocol, a would-be deleter must pin the buffer and
2573  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
2574  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
2575  * it has successfully observed pin count = 1.
2576  */
2577 void
2578 LockBufferForCleanup(Buffer buffer)
2579 {
2580         volatile BufferDesc *bufHdr;
2581
2582         Assert(BufferIsValid(buffer));
2583         Assert(PinCountWaitBuf == NULL);
2584
2585         if (BufferIsLocal(buffer))
2586         {
2587                 /* There should be exactly one pin */
2588                 if (LocalRefCount[-buffer - 1] != 1)
2589                         elog(ERROR, "incorrect local pin count: %d",
2590                                  LocalRefCount[-buffer - 1]);
2591                 /* Nobody else to wait for */
2592                 return;
2593         }
2594
2595         /* There should be exactly one local pin */
2596         if (PrivateRefCount[buffer - 1] != 1)
2597                 elog(ERROR, "incorrect local pin count: %d",
2598                          PrivateRefCount[buffer - 1]);
2599
2600         bufHdr = &BufferDescriptors[buffer - 1];
2601
2602         for (;;)
2603         {
2604                 /* Try to acquire lock */
2605                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
2606                 LockBufHdr(bufHdr);
2607                 Assert(bufHdr->refcount > 0);
2608                 if (bufHdr->refcount == 1)
2609                 {
2610                         /* Successfully acquired exclusive lock with pincount 1 */
2611                         UnlockBufHdr(bufHdr);
2612                         return;
2613                 }
2614                 /* Failed, so mark myself as waiting for pincount 1 */
2615                 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
2616                 {
2617                         UnlockBufHdr(bufHdr);
2618                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2619                         elog(ERROR, "multiple backends attempting to wait for pincount 1");
2620                 }
2621                 bufHdr->wait_backend_pid = MyProcPid;
2622                 bufHdr->flags |= BM_PIN_COUNT_WAITER;
2623                 PinCountWaitBuf = bufHdr;
2624                 UnlockBufHdr(bufHdr);
2625                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2626
2627                 /* Wait to be signaled by UnpinBuffer() */
2628                 if (InHotStandby)
2629                 {
2630                         /* Publish the bufid that Startup process waits on */
2631                         SetStartupBufferPinWaitBufId(buffer - 1);
2632                         /* Set alarm and then wait to be signaled by UnpinBuffer() */
2633                         ResolveRecoveryConflictWithBufferPin();
2634                         /* Reset the published bufid */
2635                         SetStartupBufferPinWaitBufId(-1);
2636                 }
2637                 else
2638                         ProcWaitForSignal();
2639
2640                 PinCountWaitBuf = NULL;
2641                 /* Loop back and try again */
2642         }
2643 }
2644
2645 /*
2646  * Check called from RecoveryConflictInterrupt handler when Startup
2647  * process requests cancellation of all pin holders that are blocking it.
2648  */
2649 bool
2650 HoldingBufferPinThatDelaysRecovery(void)
2651 {
2652         int                     bufid = GetStartupBufferPinWaitBufId();
2653
2654         /*
2655          * If we get woken slowly then it's possible that the Startup process was
2656          * already woken by other backends before we got here. Also possible that
2657          * we get here by multiple interrupts or interrupts at inappropriate
2658          * times, so make sure we do nothing if the bufid is not set.
2659          */
2660         if (bufid < 0)
2661                 return false;
2662
2663         if (PrivateRefCount[bufid] > 0)
2664                 return true;
2665
2666         return false;
2667 }
2668
2669 /*
2670  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
2671  *
2672  * We won't loop, but just check once to see if the pin count is OK.  If
2673  * not, return FALSE with no lock held.
2674  */
2675 bool
2676 ConditionalLockBufferForCleanup(Buffer buffer)
2677 {
2678         volatile BufferDesc *bufHdr;
2679
2680         Assert(BufferIsValid(buffer));
2681
2682         if (BufferIsLocal(buffer))
2683         {
2684                 /* There should be exactly one pin */
2685                 Assert(LocalRefCount[-buffer - 1] > 0);
2686                 if (LocalRefCount[-buffer - 1] != 1)
2687                         return false;
2688                 /* Nobody else to wait for */
2689                 return true;
2690         }
2691
2692         /* There should be exactly one local pin */
2693         Assert(PrivateRefCount[buffer - 1] > 0);
2694         if (PrivateRefCount[buffer - 1] != 1)
2695                 return false;
2696
2697         /* Try to acquire lock */
2698         if (!ConditionalLockBuffer(buffer))
2699                 return false;
2700
2701         bufHdr = &BufferDescriptors[buffer - 1];
2702         LockBufHdr(bufHdr);
2703         Assert(bufHdr->refcount > 0);
2704         if (bufHdr->refcount == 1)
2705         {
2706                 /* Successfully acquired exclusive lock with pincount 1 */
2707                 UnlockBufHdr(bufHdr);
2708                 return true;
2709         }
2710
2711         /* Failed, so release the lock */
2712         UnlockBufHdr(bufHdr);
2713         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2714         return false;
2715 }
2716
2717
2718 /*
2719  *      Functions for buffer I/O handling
2720  *
2721  *      Note: We assume that nested buffer I/O never occurs.
2722  *      i.e at most one io_in_progress lock is held per proc.
2723  *
2724  *      Also note that these are used only for shared buffers, not local ones.
2725  */
2726
2727 /*
2728  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
2729  */
2730 static void
2731 WaitIO(volatile BufferDesc *buf)
2732 {
2733         /*
2734          * Changed to wait until there's no IO - Inoue 01/13/2000
2735          *
2736          * Note this is *necessary* because an error abort in the process doing
2737          * I/O could release the io_in_progress_lock prematurely. See
2738          * AbortBufferIO.
2739          */
2740         for (;;)
2741         {
2742                 BufFlags        sv_flags;
2743
2744                 /*
2745                  * It may not be necessary to acquire the spinlock to check the flag
2746                  * here, but since this test is essential for correctness, we'd better
2747                  * play it safe.
2748                  */
2749                 LockBufHdr(buf);
2750                 sv_flags = buf->flags;
2751                 UnlockBufHdr(buf);
2752                 if (!(sv_flags & BM_IO_IN_PROGRESS))
2753                         break;
2754                 LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
2755                 LWLockRelease(buf->io_in_progress_lock);
2756         }
2757 }
2758
2759 /*
2760  * StartBufferIO: begin I/O on this buffer
2761  *      (Assumptions)
2762  *      My process is executing no IO
2763  *      The buffer is Pinned
2764  *
2765  * In some scenarios there are race conditions in which multiple backends
2766  * could attempt the same I/O operation concurrently.  If someone else
2767  * has already started I/O on this buffer then we will block on the
2768  * io_in_progress lock until he's done.
2769  *
2770  * Input operations are only attempted on buffers that are not BM_VALID,
2771  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
2772  * so we can always tell if the work is already done.
2773  *
2774  * Returns TRUE if we successfully marked the buffer as I/O busy,
2775  * FALSE if someone else already did the work.
2776  */
2777 static bool
2778 StartBufferIO(volatile BufferDesc *buf, bool forInput)
2779 {
2780         Assert(!InProgressBuf);
2781
2782         for (;;)
2783         {
2784                 /*
2785                  * Grab the io_in_progress lock so that other processes can wait for
2786                  * me to finish the I/O.
2787                  */
2788                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
2789
2790                 LockBufHdr(buf);
2791
2792                 if (!(buf->flags & BM_IO_IN_PROGRESS))
2793                         break;
2794
2795                 /*
2796                  * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
2797                  * lock isn't held is if the process doing the I/O is recovering from
2798                  * an error (see AbortBufferIO).  If that's the case, we must wait for
2799                  * him to get unwedged.
2800                  */
2801                 UnlockBufHdr(buf);
2802                 LWLockRelease(buf->io_in_progress_lock);
2803                 WaitIO(buf);
2804         }
2805
2806         /* Once we get here, there is definitely no I/O active on this buffer */
2807
2808         if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
2809         {
2810                 /* someone else already did the I/O */
2811                 UnlockBufHdr(buf);
2812                 LWLockRelease(buf->io_in_progress_lock);
2813                 return false;
2814         }
2815
2816         buf->flags |= BM_IO_IN_PROGRESS;
2817
2818         UnlockBufHdr(buf);
2819
2820         InProgressBuf = buf;
2821         IsForInput = forInput;
2822
2823         return true;
2824 }
2825
2826 /*
2827  * TerminateBufferIO: release a buffer we were doing I/O on
2828  *      (Assumptions)
2829  *      My process is executing IO for the buffer
2830  *      BM_IO_IN_PROGRESS bit is set for the buffer
2831  *      We hold the buffer's io_in_progress lock
2832  *      The buffer is Pinned
2833  *
2834  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
2835  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
2836  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
2837  * marking the buffer clean if it was re-dirtied while we were writing.
2838  *
2839  * set_flag_bits gets ORed into the buffer's flags.  It must include
2840  * BM_IO_ERROR in a failure case.  For successful completion it could
2841  * be 0, or BM_VALID if we just finished reading in the page.
2842  */
2843 static void
2844 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
2845                                   int set_flag_bits)
2846 {
2847         Assert(buf == InProgressBuf);
2848
2849         LockBufHdr(buf);
2850
2851         Assert(buf->flags & BM_IO_IN_PROGRESS);
2852         buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
2853         if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
2854                 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
2855         buf->flags |= set_flag_bits;
2856
2857         UnlockBufHdr(buf);
2858
2859         InProgressBuf = NULL;
2860
2861         LWLockRelease(buf->io_in_progress_lock);
2862 }
2863
2864 /*
2865  * AbortBufferIO: Clean up any active buffer I/O after an error.
2866  *
2867  *      All LWLocks we might have held have been released,
2868  *      but we haven't yet released buffer pins, so the buffer is still pinned.
2869  *
2870  *      If I/O was in progress, we always set BM_IO_ERROR, even though it's
2871  *      possible the error condition wasn't related to the I/O.
2872  */
2873 void
2874 AbortBufferIO(void)
2875 {
2876         volatile BufferDesc *buf = InProgressBuf;
2877
2878         if (buf)
2879         {
2880                 /*
2881                  * Since LWLockReleaseAll has already been called, we're not holding
2882                  * the buffer's io_in_progress_lock. We have to re-acquire it so that
2883                  * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
2884                  * buffer will be in a busy spin until we succeed in doing this.
2885                  */
2886                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
2887
2888                 LockBufHdr(buf);
2889                 Assert(buf->flags & BM_IO_IN_PROGRESS);
2890                 if (IsForInput)
2891                 {
2892                         Assert(!(buf->flags & BM_DIRTY));
2893                         /* We'd better not think buffer is valid yet */
2894                         Assert(!(buf->flags & BM_VALID));
2895                         UnlockBufHdr(buf);
2896                 }
2897                 else
2898                 {
2899                         BufFlags        sv_flags;
2900
2901                         sv_flags = buf->flags;
2902                         Assert(sv_flags & BM_DIRTY);
2903                         UnlockBufHdr(buf);
2904                         /* Issue notice if this is not the first failure... */
2905                         if (sv_flags & BM_IO_ERROR)
2906                         {
2907                                 /* Buffer is pinned, so we can read tag without spinlock */
2908                                 char       *path;
2909
2910                                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
2911                                 ereport(WARNING,
2912                                                 (errcode(ERRCODE_IO_ERROR),
2913                                                  errmsg("could not write block %u of %s",
2914                                                                 buf->tag.blockNum, path),
2915                                                  errdetail("Multiple failures --- write error might be permanent.")));
2916                                 pfree(path);
2917                         }
2918                 }
2919                 TerminateBufferIO(buf, false, BM_IO_ERROR);
2920         }
2921 }
2922
2923 /*
2924  * Error context callback for errors occurring during shared buffer writes.
2925  */
2926 static void
2927 shared_buffer_write_error_callback(void *arg)
2928 {
2929         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
2930
2931         /* Buffer is pinned, so we can read the tag without locking the spinlock */
2932         if (bufHdr != NULL)
2933         {
2934                 char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
2935
2936                 errcontext("writing block %u of relation %s",
2937                                    bufHdr->tag.blockNum, path);
2938                 pfree(path);
2939         }
2940 }
2941
2942 /*
2943  * Error context callback for errors occurring during local buffer writes.
2944  */
2945 static void
2946 local_buffer_write_error_callback(void *arg)
2947 {
2948         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
2949
2950         if (bufHdr != NULL)
2951         {
2952                 char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
2953                                                                                   bufHdr->tag.forkNum);
2954
2955                 errcontext("writing block %u of relation %s",
2956                                    bufHdr->tag.blockNum, path);
2957                 pfree(path);
2958         }
2959 }