]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
Fix typo in comment.
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *              and pin it so that no one can destroy it while this process
20  *              is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *              The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *              freelist.c -- chooses victim for buffer replacement
29  *              buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "access/xlog.h"
37 #include "catalog/catalog.h"
38 #include "catalog/storage.h"
39 #include "executor/instrument.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/buf_internals.h"
45 #include "storage/bufmgr.h"
46 #include "storage/ipc.h"
47 #include "storage/proc.h"
48 #include "storage/smgr.h"
49 #include "storage/standby.h"
50 #include "utils/rel.h"
51 #include "utils/resowner_private.h"
52 #include "utils/timestamp.h"
53
54
55 /* Note: these two macros only work on shared buffers, not local ones! */
56 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
57 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
58
59 /* Note: this macro only works on local buffers, not shared ones! */
60 #define LocalBufHdrGetBlock(bufHdr) \
61         LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
62
63 /* Bits in SyncOneBuffer's return value */
64 #define BUF_WRITTEN                             0x01
65 #define BUF_REUSABLE                    0x02
66
67 #define DROP_RELS_BSEARCH_THRESHOLD             20
68
69 typedef struct PrivateRefCountEntry
70 {
71         Buffer buffer;
72         int32 refcount;
73 } PrivateRefCountEntry;
74
75 /* 64 bytes, about the size of a cache line on common systems */
76 #define REFCOUNT_ARRAY_ENTRIES 8
77
78 /* GUC variables */
79 bool            zero_damaged_pages = false;
80 int                     bgwriter_lru_maxpages = 100;
81 double          bgwriter_lru_multiplier = 2.0;
82 bool            track_io_timing = false;
83
84 /*
85  * How many buffers PrefetchBuffer callers should try to stay ahead of their
86  * ReadBuffer calls by.  This is maintained by the assign hook for
87  * effective_io_concurrency.  Zero means "never prefetch".
88  */
89 int                     target_prefetch_pages = 0;
90
91 /* local state for StartBufferIO and related functions */
92 static volatile BufferDesc *InProgressBuf = NULL;
93 static bool IsForInput;
94
95 /* local state for LockBufferForCleanup */
96 static volatile BufferDesc *PinCountWaitBuf = NULL;
97
98 /*
99  * Backend-Private refcount management:
100  *
101  * Each buffer also has a private refcount that keeps track of the number of
102  * times the buffer is pinned in the current process.  This is so that the
103  * shared refcount needs to be modified only once if a buffer is pinned more
104  * than once by a individual backend.  It's also used to check that no buffers
105  * are still pinned at the end of transactions and when exiting.
106  *
107  *
108  * To avoid - as we used to - requiring an array with NBuffers entries to keep
109  * track of local buffers, we use a small sequentially searched array
110  * (PrivateRefCountArray) and a overflow hash table (PrivateRefCountHash) to
111  * keep track of backend local pins.
112  *
113  * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
114  * refcounts are kept track of in the array; after that, new array entries
115  * displace old ones into the hash table. That way a frequently used entry
116  * can't get "stuck" in the hashtable while infrequent ones clog the array.
117  *
118  * Note that in most scenarios the number of pinned buffers will not exceed
119  * REFCOUNT_ARRAY_ENTRIES.
120  *
121  *
122  * To enter a buffer into the refcount tracking mechanism first reserve a free
123  * entry using ReservePrivateRefCountEntry() and then later, if necessary,
124  * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
125  * memory allocations in NewPrivateRefCountEntry() which can be important
126  * because in some scenarios it's called with a spinlock held...
127  */
128 static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
129 static HTAB *PrivateRefCountHash = NULL;
130 static int32 PrivateRefCountOverflowed = 0;
131 static uint32 PrivateRefCountClock = 0;
132 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
133
134 static void ReservePrivateRefCountEntry(void);
135 static PrivateRefCountEntry* NewPrivateRefCountEntry(Buffer buffer);
136 static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool do_move);
137 static inline int32 GetPrivateRefCount(Buffer buffer);
138 static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
139
140 /*
141  * Ensure that the PrivateRefCountArray has sufficient space to store one more
142  * entry. This has to be called before using NewPrivateRefCountEntry() to fill
143  * a new entry - but it's perfectly fine to not use a reserved entry.
144  */
145 static void
146 ReservePrivateRefCountEntry(void)
147 {
148         /* Already reserved (or freed), nothing to do */
149         if (ReservedRefCountEntry != NULL)
150                 return;
151
152         /*
153          * First search for a free entry the array, that'll be sufficient in the
154          * majority of cases.
155          */
156         {
157                 int i;
158
159                 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
160                 {
161                         PrivateRefCountEntry *res;
162
163                         res = &PrivateRefCountArray[i];
164
165                         if (res->buffer == InvalidBuffer)
166                         {
167                                 ReservedRefCountEntry = res;
168                                 return;
169                         }
170                 }
171         }
172
173         /*
174          * No luck. All array entries are full. Move one array entry into the hash
175          * table.
176          */
177         {
178                 /*
179                  * Move entry from the current clock position in the array into the
180                  * hashtable. Use that slot.
181                  */
182                 PrivateRefCountEntry *hashent;
183                 bool found;
184
185                 /* select victim slot */
186                 ReservedRefCountEntry  =
187                         &PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
188
189                 /* Better be used, otherwise we shouldn't get here. */
190                 Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
191
192                 /* enter victim array entry into hashtable */
193                 hashent = hash_search(PrivateRefCountHash,
194                                                           (void *) &(ReservedRefCountEntry->buffer),
195                                                           HASH_ENTER,
196                                                           &found);
197                 Assert(!found);
198                 hashent->refcount = ReservedRefCountEntry->refcount;
199
200                 /* clear the now free array slot */
201                 ReservedRefCountEntry->buffer = InvalidBuffer;
202                 ReservedRefCountEntry->refcount = 0;
203
204                 PrivateRefCountOverflowed++;
205         }
206 }
207
208 /*
209  * Fill a previously reserved refcount entry.
210  */
211 static PrivateRefCountEntry*
212 NewPrivateRefCountEntry(Buffer buffer)
213 {
214         PrivateRefCountEntry *res;
215
216         /* only allowed to be called when a reservation has been made */
217         Assert(ReservedRefCountEntry != NULL);
218
219         /* use up the reserved entry */
220         res = ReservedRefCountEntry;
221         ReservedRefCountEntry = NULL;
222
223         /* and fill it */
224         res->buffer = buffer;
225         res->refcount = 0;
226
227         return res;
228 }
229
230 /*
231  * Return the PrivateRefCount entry for the passed buffer.
232  *
233  * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
234  * do_move is true, and the entry resides in the hashtable the entry is
235  * optimized for frequent access by moving it to the array.
236  */
237 static PrivateRefCountEntry*
238 GetPrivateRefCountEntry(Buffer buffer, bool do_move)
239 {
240         PrivateRefCountEntry *res;
241         int                     i;
242
243         Assert(BufferIsValid(buffer));
244         Assert(!BufferIsLocal(buffer));
245
246         /*
247          * First search for references in the array, that'll be sufficient in the
248          * majority of cases.
249          */
250         for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
251         {
252                 res = &PrivateRefCountArray[i];
253
254                 if (res->buffer == buffer)
255                         return res;
256         }
257
258         /*
259          * By here we know that the buffer, if already pinned, isn't residing in
260          * the array.
261          *
262          * Only look up the buffer in the hashtable if we've previously overflowed
263          * into it.
264          */
265         if (PrivateRefCountOverflowed == 0)
266                 return NULL;
267
268         res = hash_search(PrivateRefCountHash,
269                                           (void *) &buffer,
270                                           HASH_FIND,
271                                           NULL);
272
273         if (res == NULL)
274                 return NULL;
275         else if (!do_move)
276         {
277                 /* caller doesn't want us to move the hash entry into the array */
278                 return res;
279         }
280         else
281         {
282                 /* move buffer from hashtable into the free array slot */
283                 bool found;
284                 PrivateRefCountEntry *free;
285
286                 /* Ensure there's a free array slot */
287                 ReservePrivateRefCountEntry();
288
289                 /* Use up the reserved slot */
290                 Assert(ReservedRefCountEntry != NULL);
291                 free = ReservedRefCountEntry;
292                 ReservedRefCountEntry = NULL;
293                 Assert(free->buffer == InvalidBuffer);
294
295                 /* and fill it */
296                 free->buffer = buffer;
297                 free->refcount = res->refcount;
298
299                 /* delete from hashtable */
300                 hash_search(PrivateRefCountHash,
301                                         (void *) &buffer,
302                                         HASH_REMOVE,
303                                         &found);
304                 Assert(found);
305                 Assert(PrivateRefCountOverflowed > 0);
306                 PrivateRefCountOverflowed--;
307
308                 return free;
309         }
310 }
311
312 /*
313  * Returns how many times the passed buffer is pinned by this backend.
314  *
315  * Only works for shared memory buffers!
316  */
317 static inline int32
318 GetPrivateRefCount(Buffer buffer)
319 {
320         PrivateRefCountEntry *ref;
321
322         Assert(BufferIsValid(buffer));
323         Assert(!BufferIsLocal(buffer));
324
325         /*
326          * Not moving the entry - that's ok for the current users, but we might
327          * want to change this one day.
328          */
329         ref = GetPrivateRefCountEntry(buffer, false);
330
331         if (ref == NULL)
332                 return 0;
333         return ref->refcount;
334 }
335
336 /*
337  * Release resources used to track the reference count of a buffer which we no
338  * longer have pinned and don't want to pin again immediately.
339  */
340 static void
341 ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
342 {
343         Assert(ref->refcount == 0);
344
345         if (ref >= &PrivateRefCountArray[0] &&
346                 ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
347         {
348                 ref->buffer = InvalidBuffer;
349                 /*
350                  * Mark the just used entry as reserved - in many scenarios that
351                  * allows us to avoid ever having to search the array/hash for free
352                  * entries.
353                  */
354                 ReservedRefCountEntry = ref;
355         }
356         else
357         {
358                 bool found;
359                 Buffer buffer = ref->buffer;
360                 hash_search(PrivateRefCountHash,
361                                         (void *) &buffer,
362                                         HASH_REMOVE,
363                                         &found);
364                 Assert(found);
365                 Assert(PrivateRefCountOverflowed > 0);
366                 PrivateRefCountOverflowed--;
367         }
368 }
369
370 /*
371  * BufferIsPinned
372  *              True iff the buffer is pinned (also checks for valid buffer number).
373  *
374  *              NOTE: what we check here is that *this* backend holds a pin on
375  *              the buffer.  We do not care whether some other backend does.
376  */
377 #define BufferIsPinned(bufnum) \
378 ( \
379         !BufferIsValid(bufnum) ? \
380                 false \
381         : \
382                 BufferIsLocal(bufnum) ? \
383                         (LocalRefCount[-(bufnum) - 1] > 0) \
384                 : \
385         (GetPrivateRefCount(bufnum) > 0) \
386 )
387
388
389 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
390                                   ForkNumber forkNum, BlockNumber blockNum,
391                                   ReadBufferMode mode, BufferAccessStrategy strategy,
392                                   bool *hit);
393 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
394 static void PinBuffer_Locked(volatile BufferDesc *buf);
395 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
396 static void BufferSync(int flags);
397 static int      SyncOneBuffer(int buf_id, bool skip_recently_used);
398 static void WaitIO(volatile BufferDesc *buf);
399 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
400 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
401                                   int set_flag_bits);
402 static void shared_buffer_write_error_callback(void *arg);
403 static void local_buffer_write_error_callback(void *arg);
404 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
405                         char relpersistence,
406                         ForkNumber forkNum,
407                         BlockNumber blockNum,
408                         BufferAccessStrategy strategy,
409                         bool *foundPtr);
410 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
411 static void AtProcExit_Buffers(int code, Datum arg);
412 static void CheckForBufferLeaks(void);
413 static int      rnode_comparator(const void *p1, const void *p2);
414
415
416 /*
417  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
418  *
419  * This is named by analogy to ReadBuffer but doesn't actually allocate a
420  * buffer.  Instead it tries to ensure that a future ReadBuffer for the given
421  * block will not be delayed by the I/O.  Prefetching is optional.
422  * No-op if prefetching isn't compiled in.
423  */
424 void
425 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
426 {
427 #ifdef USE_PREFETCH
428         Assert(RelationIsValid(reln));
429         Assert(BlockNumberIsValid(blockNum));
430
431         /* Open it at the smgr level if not already done */
432         RelationOpenSmgr(reln);
433
434         if (RelationUsesLocalBuffers(reln))
435         {
436                 /* see comments in ReadBufferExtended */
437                 if (RELATION_IS_OTHER_TEMP(reln))
438                         ereport(ERROR,
439                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
440                                 errmsg("cannot access temporary tables of other sessions")));
441
442                 /* pass it off to localbuf.c */
443                 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
444         }
445         else
446         {
447                 BufferTag       newTag;         /* identity of requested block */
448                 uint32          newHash;        /* hash value for newTag */
449                 LWLock     *newPartitionLock;   /* buffer partition lock for it */
450                 int                     buf_id;
451
452                 /* create a tag so we can lookup the buffer */
453                 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
454                                            forkNum, blockNum);
455
456                 /* determine its hash code and partition lock ID */
457                 newHash = BufTableHashCode(&newTag);
458                 newPartitionLock = BufMappingPartitionLock(newHash);
459
460                 /* see if the block is in the buffer pool already */
461                 LWLockAcquire(newPartitionLock, LW_SHARED);
462                 buf_id = BufTableLookup(&newTag, newHash);
463                 LWLockRelease(newPartitionLock);
464
465                 /* If not in buffers, initiate prefetch */
466                 if (buf_id < 0)
467                         smgrprefetch(reln->rd_smgr, forkNum, blockNum);
468
469                 /*
470                  * If the block *is* in buffers, we do nothing.  This is not really
471                  * ideal: the block might be just about to be evicted, which would be
472                  * stupid since we know we are going to need it soon.  But the only
473                  * easy answer is to bump the usage_count, which does not seem like a
474                  * great solution: when the caller does ultimately touch the block,
475                  * usage_count would get bumped again, resulting in too much
476                  * favoritism for blocks that are involved in a prefetch sequence. A
477                  * real fix would involve some additional per-buffer state, and it's
478                  * not clear that there's enough of a problem to justify that.
479                  */
480         }
481 #endif   /* USE_PREFETCH */
482 }
483
484
485 /*
486  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
487  *              fork with RBM_NORMAL mode and default strategy.
488  */
489 Buffer
490 ReadBuffer(Relation reln, BlockNumber blockNum)
491 {
492         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
493 }
494
495 /*
496  * ReadBufferExtended -- returns a buffer containing the requested
497  *              block of the requested relation.  If the blknum
498  *              requested is P_NEW, extend the relation file and
499  *              allocate a new block.  (Caller is responsible for
500  *              ensuring that only one backend tries to extend a
501  *              relation at the same time!)
502  *
503  * Returns: the buffer number for the buffer containing
504  *              the block read.  The returned buffer has been pinned.
505  *              Does not return on error --- elog's instead.
506  *
507  * Assume when this function is called, that reln has been opened already.
508  *
509  * In RBM_NORMAL mode, the page is read from disk, and the page header is
510  * validated.  An error is thrown if the page header is not valid.  (But
511  * note that an all-zero page is considered "valid"; see PageIsVerified().)
512  *
513  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
514  * valid, the page is zeroed instead of throwing an error. This is intended
515  * for non-critical data, where the caller is prepared to repair errors.
516  *
517  * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
518  * filled with zeros instead of reading it from disk.  Useful when the caller
519  * is going to fill the page from scratch, since this saves I/O and avoids
520  * unnecessary failure if the page-on-disk has corrupt page headers.
521  * The page is returned locked to ensure that the caller has a chance to
522  * initialize the page before it's made visible to others.
523  * Caution: do not use this mode to read a page that is beyond the relation's
524  * current physical EOF; that is likely to cause problems in md.c when
525  * the page is modified and written out. P_NEW is OK, though.
526  *
527  * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
528  * a cleanup-strength lock on the page.
529  *
530  * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
531  *
532  * If strategy is not NULL, a nondefault buffer access strategy is used.
533  * See buffer/README for details.
534  */
535 Buffer
536 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
537                                    ReadBufferMode mode, BufferAccessStrategy strategy)
538 {
539         bool            hit;
540         Buffer          buf;
541
542         /* Open it at the smgr level if not already done */
543         RelationOpenSmgr(reln);
544
545         /*
546          * Reject attempts to read non-local temporary relations; we would be
547          * likely to get wrong data since we have no visibility into the owning
548          * session's local buffers.
549          */
550         if (RELATION_IS_OTHER_TEMP(reln))
551                 ereport(ERROR,
552                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
553                                  errmsg("cannot access temporary tables of other sessions")));
554
555         /*
556          * Read the buffer, and update pgstat counters to reflect a cache hit or
557          * miss.
558          */
559         pgstat_count_buffer_read(reln);
560         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
561                                                         forkNum, blockNum, mode, strategy, &hit);
562         if (hit)
563                 pgstat_count_buffer_hit(reln);
564         return buf;
565 }
566
567
568 /*
569  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
570  *              a relcache entry for the relation.
571  *
572  * NB: At present, this function may only be used on permanent relations, which
573  * is OK, because we only use it during XLOG replay.  If in the future we
574  * want to use it on temporary or unlogged relations, we could pass additional
575  * parameters.
576  */
577 Buffer
578 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
579                                                   BlockNumber blockNum, ReadBufferMode mode,
580                                                   BufferAccessStrategy strategy)
581 {
582         bool            hit;
583
584         SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
585
586         Assert(InRecovery);
587
588         return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
589                                                          mode, strategy, &hit);
590 }
591
592
593 /*
594  * ReadBuffer_common -- common logic for all ReadBuffer variants
595  *
596  * *hit is set to true if the request was satisfied from shared buffer cache.
597  */
598 static Buffer
599 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
600                                   BlockNumber blockNum, ReadBufferMode mode,
601                                   BufferAccessStrategy strategy, bool *hit)
602 {
603         volatile BufferDesc *bufHdr;
604         Block           bufBlock;
605         bool            found;
606         bool            isExtend;
607         bool            isLocalBuf = SmgrIsTemp(smgr);
608
609         *hit = false;
610
611         /* Make sure we will have room to remember the buffer pin */
612         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
613
614         isExtend = (blockNum == P_NEW);
615
616         TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
617                                                                            smgr->smgr_rnode.node.spcNode,
618                                                                            smgr->smgr_rnode.node.dbNode,
619                                                                            smgr->smgr_rnode.node.relNode,
620                                                                            smgr->smgr_rnode.backend,
621                                                                            isExtend);
622
623         /* Substitute proper block number if caller asked for P_NEW */
624         if (isExtend)
625                 blockNum = smgrnblocks(smgr, forkNum);
626
627         if (isLocalBuf)
628         {
629                 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
630                 if (found)
631                         pgBufferUsage.local_blks_hit++;
632                 else
633                         pgBufferUsage.local_blks_read++;
634         }
635         else
636         {
637                 /*
638                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
639                  * not currently in memory.
640                  */
641                 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
642                                                          strategy, &found);
643                 if (found)
644                         pgBufferUsage.shared_blks_hit++;
645                 else
646                         pgBufferUsage.shared_blks_read++;
647         }
648
649         /* At this point we do NOT hold any locks. */
650
651         /* if it was already in the buffer pool, we're done */
652         if (found)
653         {
654                 if (!isExtend)
655                 {
656                         /* Just need to update stats before we exit */
657                         *hit = true;
658                         VacuumPageHit++;
659
660                         if (VacuumCostActive)
661                                 VacuumCostBalance += VacuumCostPageHit;
662
663                         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
664                                                                                           smgr->smgr_rnode.node.spcNode,
665                                                                                           smgr->smgr_rnode.node.dbNode,
666                                                                                           smgr->smgr_rnode.node.relNode,
667                                                                                           smgr->smgr_rnode.backend,
668                                                                                           isExtend,
669                                                                                           found);
670
671                         /*
672                          * In RBM_ZERO_AND_LOCK mode the caller expects the page to
673                          * be locked on return.
674                          */
675                         if (!isLocalBuf)
676                         {
677                                 if (mode == RBM_ZERO_AND_LOCK)
678                                         LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE);
679                                 else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
680                                         LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
681                         }
682
683                         return BufferDescriptorGetBuffer(bufHdr);
684                 }
685
686                 /*
687                  * We get here only in the corner case where we are trying to extend
688                  * the relation but we found a pre-existing buffer marked BM_VALID.
689                  * This can happen because mdread doesn't complain about reads beyond
690                  * EOF (when zero_damaged_pages is ON) and so a previous attempt to
691                  * read a block beyond EOF could have left a "valid" zero-filled
692                  * buffer.  Unfortunately, we have also seen this case occurring
693                  * because of buggy Linux kernels that sometimes return an
694                  * lseek(SEEK_END) result that doesn't account for a recent write. In
695                  * that situation, the pre-existing buffer would contain valid data
696                  * that we don't want to overwrite.  Since the legitimate case should
697                  * always have left a zero-filled buffer, complain if not PageIsNew.
698                  */
699                 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
700                 if (!PageIsNew((Page) bufBlock))
701                         ereport(ERROR,
702                          (errmsg("unexpected data beyond EOF in block %u of relation %s",
703                                          blockNum, relpath(smgr->smgr_rnode, forkNum)),
704                           errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
705
706                 /*
707                  * We *must* do smgrextend before succeeding, else the page will not
708                  * be reserved by the kernel, and the next P_NEW call will decide to
709                  * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
710                  * call that BufferAlloc didn't, and proceed.
711                  */
712                 if (isLocalBuf)
713                 {
714                         /* Only need to adjust flags */
715                         Assert(bufHdr->flags & BM_VALID);
716                         bufHdr->flags &= ~BM_VALID;
717                 }
718                 else
719                 {
720                         /*
721                          * Loop to handle the very small possibility that someone re-sets
722                          * BM_VALID between our clearing it and StartBufferIO inspecting
723                          * it.
724                          */
725                         do
726                         {
727                                 LockBufHdr(bufHdr);
728                                 Assert(bufHdr->flags & BM_VALID);
729                                 bufHdr->flags &= ~BM_VALID;
730                                 UnlockBufHdr(bufHdr);
731                         } while (!StartBufferIO(bufHdr, true));
732                 }
733         }
734
735         /*
736          * if we have gotten to this point, we have allocated a buffer for the
737          * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
738          * if it's a shared buffer.
739          *
740          * Note: if smgrextend fails, we will end up with a buffer that is
741          * allocated but not marked BM_VALID.  P_NEW will still select the same
742          * block number (because the relation didn't get any longer on disk) and
743          * so future attempts to extend the relation will find the same buffer (if
744          * it's not been recycled) but come right back here to try smgrextend
745          * again.
746          */
747         Assert(!(bufHdr->flags & BM_VALID));            /* spinlock not needed */
748
749         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
750
751         if (isExtend)
752         {
753                 /* new buffers are zero-filled */
754                 MemSet((char *) bufBlock, 0, BLCKSZ);
755                 /* don't set checksum for all-zero page */
756                 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
757         }
758         else
759         {
760                 /*
761                  * Read in the page, unless the caller intends to overwrite it and
762                  * just wants us to allocate a buffer.
763                  */
764                 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
765                         MemSet((char *) bufBlock, 0, BLCKSZ);
766                 else
767                 {
768                         instr_time      io_start,
769                                                 io_time;
770
771                         if (track_io_timing)
772                                 INSTR_TIME_SET_CURRENT(io_start);
773
774                         smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
775
776                         if (track_io_timing)
777                         {
778                                 INSTR_TIME_SET_CURRENT(io_time);
779                                 INSTR_TIME_SUBTRACT(io_time, io_start);
780                                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
781                                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
782                         }
783
784                         /* check for garbage data */
785                         if (!PageIsVerified((Page) bufBlock, blockNum))
786                         {
787                                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
788                                 {
789                                         ereport(WARNING,
790                                                         (errcode(ERRCODE_DATA_CORRUPTED),
791                                                          errmsg("invalid page in block %u of relation %s; zeroing out page",
792                                                                         blockNum,
793                                                                         relpath(smgr->smgr_rnode, forkNum))));
794                                         MemSet((char *) bufBlock, 0, BLCKSZ);
795                                 }
796                                 else
797                                         ereport(ERROR,
798                                                         (errcode(ERRCODE_DATA_CORRUPTED),
799                                                          errmsg("invalid page in block %u of relation %s",
800                                                                         blockNum,
801                                                                         relpath(smgr->smgr_rnode, forkNum))));
802                         }
803                 }
804         }
805
806         /*
807          * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
808          * the page as valid, to make sure that no other backend sees the zeroed
809          * page before the caller has had a chance to initialize it.
810          *
811          * Since no-one else can be looking at the page contents yet, there is no
812          * difference between an exclusive lock and a cleanup-strength lock.
813          * (Note that we cannot use LockBuffer() of LockBufferForCleanup() here,
814          * because they assert that the buffer is already valid.)
815          */
816         if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
817                 !isLocalBuf)
818         {
819                 LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE);
820         }
821
822         if (isLocalBuf)
823         {
824                 /* Only need to adjust flags */
825                 bufHdr->flags |= BM_VALID;
826         }
827         else
828         {
829                 /* Set BM_VALID, terminate IO, and wake up any waiters */
830                 TerminateBufferIO(bufHdr, false, BM_VALID);
831         }
832
833         VacuumPageMiss++;
834         if (VacuumCostActive)
835                 VacuumCostBalance += VacuumCostPageMiss;
836
837         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
838                                                                           smgr->smgr_rnode.node.spcNode,
839                                                                           smgr->smgr_rnode.node.dbNode,
840                                                                           smgr->smgr_rnode.node.relNode,
841                                                                           smgr->smgr_rnode.backend,
842                                                                           isExtend,
843                                                                           found);
844
845         return BufferDescriptorGetBuffer(bufHdr);
846 }
847
848 /*
849  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
850  *              buffer.  If no buffer exists already, selects a replacement
851  *              victim and evicts the old page, but does NOT read in new page.
852  *
853  * "strategy" can be a buffer replacement strategy object, or NULL for
854  * the default strategy.  The selected buffer's usage_count is advanced when
855  * using the default strategy, but otherwise possibly not (see PinBuffer).
856  *
857  * The returned buffer is pinned and is already marked as holding the
858  * desired page.  If it already did have the desired page, *foundPtr is
859  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
860  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
861  *
862  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
863  * we keep it for simplicity in ReadBuffer.
864  *
865  * No locks are held either at entry or exit.
866  */
867 static volatile BufferDesc *
868 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
869                         BlockNumber blockNum,
870                         BufferAccessStrategy strategy,
871                         bool *foundPtr)
872 {
873         BufferTag       newTag;                 /* identity of requested block */
874         uint32          newHash;                /* hash value for newTag */
875         LWLock     *newPartitionLock;           /* buffer partition lock for it */
876         BufferTag       oldTag;                 /* previous identity of selected buffer */
877         uint32          oldHash;                /* hash value for oldTag */
878         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
879         BufFlags        oldFlags;
880         int                     buf_id;
881         volatile BufferDesc *buf;
882         bool            valid;
883
884         /* create a tag so we can lookup the buffer */
885         INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
886
887         /* determine its hash code and partition lock ID */
888         newHash = BufTableHashCode(&newTag);
889         newPartitionLock = BufMappingPartitionLock(newHash);
890
891         /* see if the block is in the buffer pool already */
892         LWLockAcquire(newPartitionLock, LW_SHARED);
893         buf_id = BufTableLookup(&newTag, newHash);
894         if (buf_id >= 0)
895         {
896                 /*
897                  * Found it.  Now, pin the buffer so no one can steal it from the
898                  * buffer pool, and check to see if the correct data has been loaded
899                  * into the buffer.
900                  */
901                 buf = &BufferDescriptors[buf_id];
902
903                 valid = PinBuffer(buf, strategy);
904
905                 /* Can release the mapping lock as soon as we've pinned it */
906                 LWLockRelease(newPartitionLock);
907
908                 *foundPtr = TRUE;
909
910                 if (!valid)
911                 {
912                         /*
913                          * We can only get here if (a) someone else is still reading in
914                          * the page, or (b) a previous read attempt failed.  We have to
915                          * wait for any active read attempt to finish, and then set up our
916                          * own read attempt if the page is still not BM_VALID.
917                          * StartBufferIO does it all.
918                          */
919                         if (StartBufferIO(buf, true))
920                         {
921                                 /*
922                                  * If we get here, previous attempts to read the buffer must
923                                  * have failed ... but we shall bravely try again.
924                                  */
925                                 *foundPtr = FALSE;
926                         }
927                 }
928
929                 return buf;
930         }
931
932         /*
933          * Didn't find it in the buffer pool.  We'll have to initialize a new
934          * buffer.  Remember to unlock the mapping lock while doing the work.
935          */
936         LWLockRelease(newPartitionLock);
937
938         /* Loop here in case we have to try another victim buffer */
939         for (;;)
940         {
941                 /*
942                  * Ensure, while the spinlock's not yet held, that there's a free refcount
943                  * entry.
944                  */
945                 ReservePrivateRefCountEntry();
946
947                 /*
948                  * Select a victim buffer.  The buffer is returned with its header
949                  * spinlock still held!
950                  */
951                 buf = StrategyGetBuffer(strategy);
952
953                 Assert(buf->refcount == 0);
954
955                 /* Must copy buffer flags while we still hold the spinlock */
956                 oldFlags = buf->flags;
957
958                 /* Pin the buffer and then release the buffer spinlock */
959                 PinBuffer_Locked(buf);
960
961                 /*
962                  * If the buffer was dirty, try to write it out.  There is a race
963                  * condition here, in that someone might dirty it after we released it
964                  * above, or even while we are writing it out (since our share-lock
965                  * won't prevent hint-bit updates).  We will recheck the dirty bit
966                  * after re-locking the buffer header.
967                  */
968                 if (oldFlags & BM_DIRTY)
969                 {
970                         /*
971                          * We need a share-lock on the buffer contents to write it out
972                          * (else we might write invalid data, eg because someone else is
973                          * compacting the page contents while we write).  We must use a
974                          * conditional lock acquisition here to avoid deadlock.  Even
975                          * though the buffer was not pinned (and therefore surely not
976                          * locked) when StrategyGetBuffer returned it, someone else could
977                          * have pinned and exclusive-locked it by the time we get here. If
978                          * we try to get the lock unconditionally, we'd block waiting for
979                          * them; if they later block waiting for us, deadlock ensues.
980                          * (This has been observed to happen when two backends are both
981                          * trying to split btree index pages, and the second one just
982                          * happens to be trying to split the page the first one got from
983                          * StrategyGetBuffer.)
984                          */
985                         if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
986                         {
987                                 /*
988                                  * If using a nondefault strategy, and writing the buffer
989                                  * would require a WAL flush, let the strategy decide whether
990                                  * to go ahead and write/reuse the buffer or to choose another
991                                  * victim.  We need lock to inspect the page LSN, so this
992                                  * can't be done inside StrategyGetBuffer.
993                                  */
994                                 if (strategy != NULL)
995                                 {
996                                         XLogRecPtr      lsn;
997
998                                         /* Read the LSN while holding buffer header lock */
999                                         LockBufHdr(buf);
1000                                         lsn = BufferGetLSN(buf);
1001                                         UnlockBufHdr(buf);
1002
1003                                         if (XLogNeedsFlush(lsn) &&
1004                                                 StrategyRejectBuffer(strategy, buf))
1005                                         {
1006                                                 /* Drop lock/pin and loop around for another buffer */
1007                                                 LWLockRelease(buf->content_lock);
1008                                                 UnpinBuffer(buf, true);
1009                                                 continue;
1010                                         }
1011                                 }
1012
1013                                 /* OK, do the I/O */
1014                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
1015                                                                                            smgr->smgr_rnode.node.spcNode,
1016                                                                                                 smgr->smgr_rnode.node.dbNode,
1017                                                                                           smgr->smgr_rnode.node.relNode);
1018
1019                                 FlushBuffer(buf, NULL);
1020                                 LWLockRelease(buf->content_lock);
1021
1022                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
1023                                                                                            smgr->smgr_rnode.node.spcNode,
1024                                                                                                 smgr->smgr_rnode.node.dbNode,
1025                                                                                           smgr->smgr_rnode.node.relNode);
1026                         }
1027                         else
1028                         {
1029                                 /*
1030                                  * Someone else has locked the buffer, so give it up and loop
1031                                  * back to get another one.
1032                                  */
1033                                 UnpinBuffer(buf, true);
1034                                 continue;
1035                         }
1036                 }
1037
1038                 /*
1039                  * To change the association of a valid buffer, we'll need to have
1040                  * exclusive lock on both the old and new mapping partitions.
1041                  */
1042                 if (oldFlags & BM_TAG_VALID)
1043                 {
1044                         /*
1045                          * Need to compute the old tag's hashcode and partition lock ID.
1046                          * XXX is it worth storing the hashcode in BufferDesc so we need
1047                          * not recompute it here?  Probably not.
1048                          */
1049                         oldTag = buf->tag;
1050                         oldHash = BufTableHashCode(&oldTag);
1051                         oldPartitionLock = BufMappingPartitionLock(oldHash);
1052
1053                         /*
1054                          * Must lock the lower-numbered partition first to avoid
1055                          * deadlocks.
1056                          */
1057                         if (oldPartitionLock < newPartitionLock)
1058                         {
1059                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1060                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1061                         }
1062                         else if (oldPartitionLock > newPartitionLock)
1063                         {
1064                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1065                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1066                         }
1067                         else
1068                         {
1069                                 /* only one partition, only one lock */
1070                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1071                         }
1072                 }
1073                 else
1074                 {
1075                         /* if it wasn't valid, we need only the new partition */
1076                         LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1077                         /* these just keep the compiler quiet about uninit variables */
1078                         oldHash = 0;
1079                         oldPartitionLock = 0;
1080                 }
1081
1082                 /*
1083                  * Try to make a hashtable entry for the buffer under its new tag.
1084                  * This could fail because while we were writing someone else
1085                  * allocated another buffer for the same block we want to read in.
1086                  * Note that we have not yet removed the hashtable entry for the old
1087                  * tag.
1088                  */
1089                 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
1090
1091                 if (buf_id >= 0)
1092                 {
1093                         /*
1094                          * Got a collision. Someone has already done what we were about to
1095                          * do. We'll just handle this as if it were found in the buffer
1096                          * pool in the first place.  First, give up the buffer we were
1097                          * planning to use.
1098                          */
1099                         UnpinBuffer(buf, true);
1100
1101                         /* Can give up that buffer's mapping partition lock now */
1102                         if ((oldFlags & BM_TAG_VALID) &&
1103                                 oldPartitionLock != newPartitionLock)
1104                                 LWLockRelease(oldPartitionLock);
1105
1106                         /* remaining code should match code at top of routine */
1107
1108                         buf = &BufferDescriptors[buf_id];
1109
1110                         valid = PinBuffer(buf, strategy);
1111
1112                         /* Can release the mapping lock as soon as we've pinned it */
1113                         LWLockRelease(newPartitionLock);
1114
1115                         *foundPtr = TRUE;
1116
1117                         if (!valid)
1118                         {
1119                                 /*
1120                                  * We can only get here if (a) someone else is still reading
1121                                  * in the page, or (b) a previous read attempt failed.  We
1122                                  * have to wait for any active read attempt to finish, and
1123                                  * then set up our own read attempt if the page is still not
1124                                  * BM_VALID.  StartBufferIO does it all.
1125                                  */
1126                                 if (StartBufferIO(buf, true))
1127                                 {
1128                                         /*
1129                                          * If we get here, previous attempts to read the buffer
1130                                          * must have failed ... but we shall bravely try again.
1131                                          */
1132                                         *foundPtr = FALSE;
1133                                 }
1134                         }
1135
1136                         return buf;
1137                 }
1138
1139                 /*
1140                  * Need to lock the buffer header too in order to change its tag.
1141                  */
1142                 LockBufHdr(buf);
1143
1144                 /*
1145                  * Somebody could have pinned or re-dirtied the buffer while we were
1146                  * doing the I/O and making the new hashtable entry.  If so, we can't
1147                  * recycle this buffer; we must undo everything we've done and start
1148                  * over with a new victim buffer.
1149                  */
1150                 oldFlags = buf->flags;
1151                 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
1152                         break;
1153
1154                 UnlockBufHdr(buf);
1155                 BufTableDelete(&newTag, newHash);
1156                 if ((oldFlags & BM_TAG_VALID) &&
1157                         oldPartitionLock != newPartitionLock)
1158                         LWLockRelease(oldPartitionLock);
1159                 LWLockRelease(newPartitionLock);
1160                 UnpinBuffer(buf, true);
1161         }
1162
1163         /*
1164          * Okay, it's finally safe to rename the buffer.
1165          *
1166          * Clearing BM_VALID here is necessary, clearing the dirtybits is just
1167          * paranoia.  We also reset the usage_count since any recency of use of
1168          * the old content is no longer relevant.  (The usage_count starts out at
1169          * 1 so that the buffer can survive one clock-sweep pass.)
1170          */
1171         buf->tag = newTag;
1172         buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
1173         if (relpersistence == RELPERSISTENCE_PERMANENT)
1174                 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
1175         else
1176                 buf->flags |= BM_TAG_VALID;
1177         buf->usage_count = 1;
1178
1179         UnlockBufHdr(buf);
1180
1181         if (oldFlags & BM_TAG_VALID)
1182         {
1183                 BufTableDelete(&oldTag, oldHash);
1184                 if (oldPartitionLock != newPartitionLock)
1185                         LWLockRelease(oldPartitionLock);
1186         }
1187
1188         LWLockRelease(newPartitionLock);
1189
1190         /*
1191          * Buffer contents are currently invalid.  Try to get the io_in_progress
1192          * lock.  If StartBufferIO returns false, then someone else managed to
1193          * read it before we did, so there's nothing left for BufferAlloc() to do.
1194          */
1195         if (StartBufferIO(buf, true))
1196                 *foundPtr = FALSE;
1197         else
1198                 *foundPtr = TRUE;
1199
1200         return buf;
1201 }
1202
1203 /*
1204  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
1205  * freelist.
1206  *
1207  * The buffer header spinlock must be held at entry.  We drop it before
1208  * returning.  (This is sane because the caller must have locked the
1209  * buffer in order to be sure it should be dropped.)
1210  *
1211  * This is used only in contexts such as dropping a relation.  We assume
1212  * that no other backend could possibly be interested in using the page,
1213  * so the only reason the buffer might be pinned is if someone else is
1214  * trying to write it out.  We have to let them finish before we can
1215  * reclaim the buffer.
1216  *
1217  * The buffer could get reclaimed by someone else while we are waiting
1218  * to acquire the necessary locks; if so, don't mess it up.
1219  */
1220 static void
1221 InvalidateBuffer(volatile BufferDesc *buf)
1222 {
1223         BufferTag       oldTag;
1224         uint32          oldHash;                /* hash value for oldTag */
1225         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
1226         BufFlags        oldFlags;
1227
1228         /* Save the original buffer tag before dropping the spinlock */
1229         oldTag = buf->tag;
1230
1231         UnlockBufHdr(buf);
1232
1233         /*
1234          * Need to compute the old tag's hashcode and partition lock ID. XXX is it
1235          * worth storing the hashcode in BufferDesc so we need not recompute it
1236          * here?  Probably not.
1237          */
1238         oldHash = BufTableHashCode(&oldTag);
1239         oldPartitionLock = BufMappingPartitionLock(oldHash);
1240
1241 retry:
1242
1243         /*
1244          * Acquire exclusive mapping lock in preparation for changing the buffer's
1245          * association.
1246          */
1247         LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1248
1249         /* Re-lock the buffer header */
1250         LockBufHdr(buf);
1251
1252         /* If it's changed while we were waiting for lock, do nothing */
1253         if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
1254         {
1255                 UnlockBufHdr(buf);
1256                 LWLockRelease(oldPartitionLock);
1257                 return;
1258         }
1259
1260         /*
1261          * We assume the only reason for it to be pinned is that someone else is
1262          * flushing the page out.  Wait for them to finish.  (This could be an
1263          * infinite loop if the refcount is messed up... it would be nice to time
1264          * out after awhile, but there seems no way to be sure how many loops may
1265          * be needed.  Note that if the other guy has pinned the buffer but not
1266          * yet done StartBufferIO, WaitIO will fall through and we'll effectively
1267          * be busy-looping here.)
1268          */
1269         if (buf->refcount != 0)
1270         {
1271                 UnlockBufHdr(buf);
1272                 LWLockRelease(oldPartitionLock);
1273                 /* safety check: should definitely not be our *own* pin */
1274                 if (GetPrivateRefCount(buf->buf_id) > 0)
1275                         elog(ERROR, "buffer is pinned in InvalidateBuffer");
1276                 WaitIO(buf);
1277                 goto retry;
1278         }
1279
1280         /*
1281          * Clear out the buffer's tag and flags.  We must do this to ensure that
1282          * linear scans of the buffer array don't think the buffer is valid.
1283          */
1284         oldFlags = buf->flags;
1285         CLEAR_BUFFERTAG(buf->tag);
1286         buf->flags = 0;
1287         buf->usage_count = 0;
1288
1289         UnlockBufHdr(buf);
1290
1291         /*
1292          * Remove the buffer from the lookup hashtable, if it was in there.
1293          */
1294         if (oldFlags & BM_TAG_VALID)
1295                 BufTableDelete(&oldTag, oldHash);
1296
1297         /*
1298          * Done with mapping lock.
1299          */
1300         LWLockRelease(oldPartitionLock);
1301
1302         /*
1303          * Insert the buffer at the head of the list of free buffers.
1304          */
1305         StrategyFreeBuffer(buf);
1306 }
1307
1308 /*
1309  * MarkBufferDirty
1310  *
1311  *              Marks buffer contents as dirty (actual write happens later).
1312  *
1313  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
1314  * exclusive lock, then somebody could be in process of writing the buffer,
1315  * leading to risk of bad data written to disk.)
1316  */
1317 void
1318 MarkBufferDirty(Buffer buffer)
1319 {
1320         volatile BufferDesc *bufHdr;
1321
1322         if (!BufferIsValid(buffer))
1323                 elog(ERROR, "bad buffer ID: %d", buffer);
1324
1325         if (BufferIsLocal(buffer))
1326         {
1327                 MarkLocalBufferDirty(buffer);
1328                 return;
1329         }
1330
1331         bufHdr = &BufferDescriptors[buffer - 1];
1332
1333         Assert(BufferIsPinned(buffer));
1334         /* unfortunately we can't check if the lock is held exclusively */
1335         Assert(LWLockHeldByMe(bufHdr->content_lock));
1336
1337         LockBufHdr(bufHdr);
1338
1339         Assert(bufHdr->refcount > 0);
1340
1341         /*
1342          * If the buffer was not dirty already, do vacuum accounting.
1343          */
1344         if (!(bufHdr->flags & BM_DIRTY))
1345         {
1346                 VacuumPageDirty++;
1347                 pgBufferUsage.shared_blks_dirtied++;
1348                 if (VacuumCostActive)
1349                         VacuumCostBalance += VacuumCostPageDirty;
1350         }
1351
1352         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1353
1354         UnlockBufHdr(bufHdr);
1355 }
1356
1357 /*
1358  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1359  *
1360  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1361  * compared to calling the two routines separately.  Now it's mainly just
1362  * a convenience function.  However, if the passed buffer is valid and
1363  * already contains the desired block, we just return it as-is; and that
1364  * does save considerable work compared to a full release and reacquire.
1365  *
1366  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1367  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1368  * but can save some tests in the caller.
1369  */
1370 Buffer
1371 ReleaseAndReadBuffer(Buffer buffer,
1372                                          Relation relation,
1373                                          BlockNumber blockNum)
1374 {
1375         ForkNumber      forkNum = MAIN_FORKNUM;
1376         volatile BufferDesc *bufHdr;
1377
1378         if (BufferIsValid(buffer))
1379         {
1380                 Assert(BufferIsPinned(buffer));
1381                 if (BufferIsLocal(buffer))
1382                 {
1383                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
1384                         if (bufHdr->tag.blockNum == blockNum &&
1385                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1386                                 bufHdr->tag.forkNum == forkNum)
1387                                 return buffer;
1388                         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1389                         LocalRefCount[-buffer - 1]--;
1390                 }
1391                 else
1392                 {
1393                         bufHdr = &BufferDescriptors[buffer - 1];
1394                         /* we have pin, so it's ok to examine tag without spinlock */
1395                         if (bufHdr->tag.blockNum == blockNum &&
1396                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1397                                 bufHdr->tag.forkNum == forkNum)
1398                                 return buffer;
1399                         UnpinBuffer(bufHdr, true);
1400                 }
1401         }
1402
1403         return ReadBuffer(relation, blockNum);
1404 }
1405
1406 /*
1407  * PinBuffer -- make buffer unavailable for replacement.
1408  *
1409  * For the default access strategy, the buffer's usage_count is incremented
1410  * when we first pin it; for other strategies we just make sure the usage_count
1411  * isn't zero.  (The idea of the latter is that we don't want synchronized
1412  * heap scans to inflate the count, but we need it to not be zero to discourage
1413  * other backends from stealing buffers from our ring.  As long as we cycle
1414  * through the ring faster than the global clock-sweep cycles, buffers in
1415  * our ring won't be chosen as victims for replacement by other backends.)
1416  *
1417  * This should be applied only to shared buffers, never local ones.
1418  *
1419  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1420  *
1421  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
1422  * some callers to avoid an extra spinlock cycle.
1423  */
1424 static bool
1425 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
1426 {
1427         int                     b = buf->buf_id;
1428         bool            result;
1429         PrivateRefCountEntry *ref;
1430
1431         ref = GetPrivateRefCountEntry(b + 1, true);
1432
1433         if (ref == NULL)
1434         {
1435                 ReservePrivateRefCountEntry();
1436                 ref = NewPrivateRefCountEntry(b + 1);
1437
1438                 LockBufHdr(buf);
1439                 buf->refcount++;
1440                 if (strategy == NULL)
1441                 {
1442                         if (buf->usage_count < BM_MAX_USAGE_COUNT)
1443                                 buf->usage_count++;
1444                 }
1445                 else
1446                 {
1447                         if (buf->usage_count == 0)
1448                                 buf->usage_count = 1;
1449                 }
1450                 result = (buf->flags & BM_VALID) != 0;
1451                 UnlockBufHdr(buf);
1452         }
1453         else
1454         {
1455                 /* If we previously pinned the buffer, it must surely be valid */
1456                 result = true;
1457         }
1458
1459         ref->refcount++;
1460         Assert(ref->refcount > 0);
1461         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1462                                                                 BufferDescriptorGetBuffer(buf));
1463         return result;
1464 }
1465
1466 /*
1467  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1468  * The spinlock is released before return.
1469  *
1470  * As this function is called with the spinlock held, the caller has to
1471  * previously call ReservePrivateRefCountEntry().
1472  *
1473  * Currently, no callers of this function want to modify the buffer's
1474  * usage_count at all, so there's no need for a strategy parameter.
1475  * Also we don't bother with a BM_VALID test (the caller could check that for
1476  * itself).
1477  *
1478  * Also all callers only ever use this function when it's known that the
1479  * buffer can't have a preexisting pin by this backend. That allows us to skip
1480  * searching the private refcount array & hash, which is a boon, because the
1481  * spinlock is still held.
1482  *
1483  * Note: use of this routine is frequently mandatory, not just an optimization
1484  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1485  * its state can change under us.
1486  */
1487 static void
1488 PinBuffer_Locked(volatile BufferDesc *buf)
1489 {
1490         int                     b = buf->buf_id;
1491         PrivateRefCountEntry *ref;
1492
1493         /*
1494          * As explained, We don't expect any preexisting pins. That allows us to
1495          * manipulate the PrivateRefCount after releasing the spinlock
1496          */
1497         Assert(GetPrivateRefCountEntry(b + 1, false) == NULL);
1498
1499         buf->refcount++;
1500         UnlockBufHdr(buf);
1501
1502         ref = NewPrivateRefCountEntry(b + 1);
1503         ref->refcount++;
1504
1505         ResourceOwnerRememberBuffer(CurrentResourceOwner,
1506                                                                 BufferDescriptorGetBuffer(buf));
1507 }
1508
1509 /*
1510  * UnpinBuffer -- make buffer available for replacement.
1511  *
1512  * This should be applied only to shared buffers, never local ones.
1513  *
1514  * Most but not all callers want CurrentResourceOwner to be adjusted.
1515  * Those that don't should pass fixOwner = FALSE.
1516  */
1517 static void
1518 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
1519 {
1520         PrivateRefCountEntry *ref;
1521
1522         /* not moving as we're likely deleting it soon anyway */
1523         ref = GetPrivateRefCountEntry(buf->buf_id + 1, false);
1524         Assert(ref != NULL);
1525
1526         if (fixOwner)
1527                 ResourceOwnerForgetBuffer(CurrentResourceOwner,
1528                                                                   BufferDescriptorGetBuffer(buf));
1529
1530         Assert(ref->refcount > 0);
1531         ref->refcount--;
1532         if (ref->refcount == 0)
1533         {
1534                 /* I'd better not still hold any locks on the buffer */
1535                 Assert(!LWLockHeldByMe(buf->content_lock));
1536                 Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
1537
1538                 LockBufHdr(buf);
1539
1540                 /* Decrement the shared reference count */
1541                 Assert(buf->refcount > 0);
1542                 buf->refcount--;
1543
1544                 /* Support LockBufferForCleanup() */
1545                 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
1546                         buf->refcount == 1)
1547                 {
1548                         /* we just released the last pin other than the waiter's */
1549                         int                     wait_backend_pid = buf->wait_backend_pid;
1550
1551                         buf->flags &= ~BM_PIN_COUNT_WAITER;
1552                         UnlockBufHdr(buf);
1553                         ProcSendSignal(wait_backend_pid);
1554                 }
1555                 else
1556                         UnlockBufHdr(buf);
1557
1558                 ForgetPrivateRefCountEntry(ref);
1559         }
1560 }
1561
1562 /*
1563  * BufferSync -- Write out all dirty buffers in the pool.
1564  *
1565  * This is called at checkpoint time to write out all dirty shared buffers.
1566  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1567  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
1568  * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
1569  * unlogged buffers, which are otherwise skipped.  The remaining flags
1570  * currently have no effect here.
1571  */
1572 static void
1573 BufferSync(int flags)
1574 {
1575         int                     buf_id;
1576         int                     num_to_scan;
1577         int                     num_to_write;
1578         int                     num_written;
1579         int                     mask = BM_DIRTY;
1580
1581         /* Make sure we can handle the pin inside SyncOneBuffer */
1582         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1583
1584         /*
1585          * Unless this is a shutdown checkpoint or we have been explicitly told,
1586          * we write only permanent, dirty buffers.  But at shutdown or end of
1587          * recovery, we write all dirty buffers.
1588          */
1589         if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
1590                                         CHECKPOINT_FLUSH_ALL))))
1591                 mask |= BM_PERMANENT;
1592
1593         /*
1594          * Loop over all buffers, and mark the ones that need to be written with
1595          * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
1596          * can estimate how much work needs to be done.
1597          *
1598          * This allows us to write only those pages that were dirty when the
1599          * checkpoint began, and not those that get dirtied while it proceeds.
1600          * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1601          * later in this function, or by normal backends or the bgwriter cleaning
1602          * scan, the flag is cleared.  Any buffer dirtied after this point won't
1603          * have the flag set.
1604          *
1605          * Note that if we fail to write some buffer, we may leave buffers with
1606          * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
1607          * certainly need to be written for the next checkpoint attempt, too.
1608          */
1609         num_to_write = 0;
1610         for (buf_id = 0; buf_id < NBuffers; buf_id++)
1611         {
1612                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1613
1614                 /*
1615                  * Header spinlock is enough to examine BM_DIRTY, see comment in
1616                  * SyncOneBuffer.
1617                  */
1618                 LockBufHdr(bufHdr);
1619
1620                 if ((bufHdr->flags & mask) == mask)
1621                 {
1622                         bufHdr->flags |= BM_CHECKPOINT_NEEDED;
1623                         num_to_write++;
1624                 }
1625
1626                 UnlockBufHdr(bufHdr);
1627         }
1628
1629         if (num_to_write == 0)
1630                 return;                                 /* nothing to do */
1631
1632         TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
1633
1634         /*
1635          * Loop over all buffers again, and write the ones (still) marked with
1636          * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
1637          * since we might as well dump soon-to-be-recycled buffers first.
1638          *
1639          * Note that we don't read the buffer alloc count here --- that should be
1640          * left untouched till the next BgBufferSync() call.
1641          */
1642         buf_id = StrategySyncStart(NULL, NULL);
1643         num_to_scan = NBuffers;
1644         num_written = 0;
1645         while (num_to_scan-- > 0)
1646         {
1647                 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
1648
1649                 /*
1650                  * We don't need to acquire the lock here, because we're only looking
1651                  * at a single bit. It's possible that someone else writes the buffer
1652                  * and clears the flag right after we check, but that doesn't matter
1653                  * since SyncOneBuffer will then do nothing.  However, there is a
1654                  * further race condition: it's conceivable that between the time we
1655                  * examine the bit here and the time SyncOneBuffer acquires lock,
1656                  * someone else not only wrote the buffer but replaced it with another
1657                  * page and dirtied it.  In that improbable case, SyncOneBuffer will
1658                  * write the buffer though we didn't need to.  It doesn't seem worth
1659                  * guarding against this, though.
1660                  */
1661                 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
1662                 {
1663                         if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
1664                         {
1665                                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1666                                 BgWriterStats.m_buf_written_checkpoints++;
1667                                 num_written++;
1668
1669                                 /*
1670                                  * We know there are at most num_to_write buffers with
1671                                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
1672                                  * num_written reaches num_to_write.
1673                                  *
1674                                  * Note that num_written doesn't include buffers written by
1675                                  * other backends, or by the bgwriter cleaning scan. That
1676                                  * means that the estimate of how much progress we've made is
1677                                  * conservative, and also that this test will often fail to
1678                                  * trigger.  But it seems worth making anyway.
1679                                  */
1680                                 if (num_written >= num_to_write)
1681                                         break;
1682
1683                                 /*
1684                                  * Sleep to throttle our I/O rate.
1685                                  */
1686                                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
1687                         }
1688                 }
1689
1690                 if (++buf_id >= NBuffers)
1691                         buf_id = 0;
1692         }
1693
1694         /*
1695          * Update checkpoint statistics. As noted above, this doesn't include
1696          * buffers written by other backends or bgwriter scan.
1697          */
1698         CheckpointStats.ckpt_bufs_written += num_written;
1699
1700         TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
1701 }
1702
1703 /*
1704  * BgBufferSync -- Write out some dirty buffers in the pool.
1705  *
1706  * This is called periodically by the background writer process.
1707  *
1708  * Returns true if it's appropriate for the bgwriter process to go into
1709  * low-power hibernation mode.  (This happens if the strategy clock sweep
1710  * has been "lapped" and no buffer allocations have occurred recently,
1711  * or if the bgwriter has been effectively disabled by setting
1712  * bgwriter_lru_maxpages to 0.)
1713  */
1714 bool
1715 BgBufferSync(void)
1716 {
1717         /* info obtained from freelist.c */
1718         int                     strategy_buf_id;
1719         uint32          strategy_passes;
1720         uint32          recent_alloc;
1721
1722         /*
1723          * Information saved between calls so we can determine the strategy
1724          * point's advance rate and avoid scanning already-cleaned buffers.
1725          */
1726         static bool saved_info_valid = false;
1727         static int      prev_strategy_buf_id;
1728         static uint32 prev_strategy_passes;
1729         static int      next_to_clean;
1730         static uint32 next_passes;
1731
1732         /* Moving averages of allocation rate and clean-buffer density */
1733         static float smoothed_alloc = 0;
1734         static float smoothed_density = 10.0;
1735
1736         /* Potentially these could be tunables, but for now, not */
1737         float           smoothing_samples = 16;
1738         float           scan_whole_pool_milliseconds = 120000.0;
1739
1740         /* Used to compute how far we scan ahead */
1741         long            strategy_delta;
1742         int                     bufs_to_lap;
1743         int                     bufs_ahead;
1744         float           scans_per_alloc;
1745         int                     reusable_buffers_est;
1746         int                     upcoming_alloc_est;
1747         int                     min_scan_buffers;
1748
1749         /* Variables for the scanning loop proper */
1750         int                     num_to_scan;
1751         int                     num_written;
1752         int                     reusable_buffers;
1753
1754         /* Variables for final smoothed_density update */
1755         long            new_strategy_delta;
1756         uint32          new_recent_alloc;
1757
1758         /*
1759          * Find out where the freelist clock sweep currently is, and how many
1760          * buffer allocations have happened since our last call.
1761          */
1762         strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
1763
1764         /* Report buffer alloc counts to pgstat */
1765         BgWriterStats.m_buf_alloc += recent_alloc;
1766
1767         /*
1768          * If we're not running the LRU scan, just stop after doing the stats
1769          * stuff.  We mark the saved state invalid so that we can recover sanely
1770          * if LRU scan is turned back on later.
1771          */
1772         if (bgwriter_lru_maxpages <= 0)
1773         {
1774                 saved_info_valid = false;
1775                 return true;
1776         }
1777
1778         /*
1779          * Compute strategy_delta = how many buffers have been scanned by the
1780          * clock sweep since last time.  If first time through, assume none. Then
1781          * see if we are still ahead of the clock sweep, and if so, how many
1782          * buffers we could scan before we'd catch up with it and "lap" it. Note:
1783          * weird-looking coding of xxx_passes comparisons are to avoid bogus
1784          * behavior when the passes counts wrap around.
1785          */
1786         if (saved_info_valid)
1787         {
1788                 int32           passes_delta = strategy_passes - prev_strategy_passes;
1789
1790                 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
1791                 strategy_delta += (long) passes_delta *NBuffers;
1792
1793                 Assert(strategy_delta >= 0);
1794
1795                 if ((int32) (next_passes - strategy_passes) > 0)
1796                 {
1797                         /* we're one pass ahead of the strategy point */
1798                         bufs_to_lap = strategy_buf_id - next_to_clean;
1799 #ifdef BGW_DEBUG
1800                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1801                                  next_passes, next_to_clean,
1802                                  strategy_passes, strategy_buf_id,
1803                                  strategy_delta, bufs_to_lap);
1804 #endif
1805                 }
1806                 else if (next_passes == strategy_passes &&
1807                                  next_to_clean >= strategy_buf_id)
1808                 {
1809                         /* on same pass, but ahead or at least not behind */
1810                         bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
1811 #ifdef BGW_DEBUG
1812                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1813                                  next_passes, next_to_clean,
1814                                  strategy_passes, strategy_buf_id,
1815                                  strategy_delta, bufs_to_lap);
1816 #endif
1817                 }
1818                 else
1819                 {
1820                         /*
1821                          * We're behind, so skip forward to the strategy point and start
1822                          * cleaning from there.
1823                          */
1824 #ifdef BGW_DEBUG
1825                         elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
1826                                  next_passes, next_to_clean,
1827                                  strategy_passes, strategy_buf_id,
1828                                  strategy_delta);
1829 #endif
1830                         next_to_clean = strategy_buf_id;
1831                         next_passes = strategy_passes;
1832                         bufs_to_lap = NBuffers;
1833                 }
1834         }
1835         else
1836         {
1837                 /*
1838                  * Initializing at startup or after LRU scanning had been off. Always
1839                  * start at the strategy point.
1840                  */
1841 #ifdef BGW_DEBUG
1842                 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
1843                          strategy_passes, strategy_buf_id);
1844 #endif
1845                 strategy_delta = 0;
1846                 next_to_clean = strategy_buf_id;
1847                 next_passes = strategy_passes;
1848                 bufs_to_lap = NBuffers;
1849         }
1850
1851         /* Update saved info for next time */
1852         prev_strategy_buf_id = strategy_buf_id;
1853         prev_strategy_passes = strategy_passes;
1854         saved_info_valid = true;
1855
1856         /*
1857          * Compute how many buffers had to be scanned for each new allocation, ie,
1858          * 1/density of reusable buffers, and track a moving average of that.
1859          *
1860          * If the strategy point didn't move, we don't update the density estimate
1861          */
1862         if (strategy_delta > 0 && recent_alloc > 0)
1863         {
1864                 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
1865                 smoothed_density += (scans_per_alloc - smoothed_density) /
1866                         smoothing_samples;
1867         }
1868
1869         /*
1870          * Estimate how many reusable buffers there are between the current
1871          * strategy point and where we've scanned ahead to, based on the smoothed
1872          * density estimate.
1873          */
1874         bufs_ahead = NBuffers - bufs_to_lap;
1875         reusable_buffers_est = (float) bufs_ahead / smoothed_density;
1876
1877         /*
1878          * Track a moving average of recent buffer allocations.  Here, rather than
1879          * a true average we want a fast-attack, slow-decline behavior: we
1880          * immediately follow any increase.
1881          */
1882         if (smoothed_alloc <= (float) recent_alloc)
1883                 smoothed_alloc = recent_alloc;
1884         else
1885                 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
1886                         smoothing_samples;
1887
1888         /* Scale the estimate by a GUC to allow more aggressive tuning. */
1889         upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
1890
1891         /*
1892          * If recent_alloc remains at zero for many cycles, smoothed_alloc will
1893          * eventually underflow to zero, and the underflows produce annoying
1894          * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
1895          * zero, there's no point in tracking smaller and smaller values of
1896          * smoothed_alloc, so just reset it to exactly zero to avoid this
1897          * syndrome.  It will pop back up as soon as recent_alloc increases.
1898          */
1899         if (upcoming_alloc_est == 0)
1900                 smoothed_alloc = 0;
1901
1902         /*
1903          * Even in cases where there's been little or no buffer allocation
1904          * activity, we want to make a small amount of progress through the buffer
1905          * cache so that as many reusable buffers as possible are clean after an
1906          * idle period.
1907          *
1908          * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
1909          * the BGW will be called during the scan_whole_pool time; slice the
1910          * buffer pool into that many sections.
1911          */
1912         min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
1913
1914         if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
1915         {
1916 #ifdef BGW_DEBUG
1917                 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
1918                          upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
1919 #endif
1920                 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
1921         }
1922
1923         /*
1924          * Now write out dirty reusable buffers, working forward from the
1925          * next_to_clean point, until we have lapped the strategy scan, or cleaned
1926          * enough buffers to match our estimate of the next cycle's allocation
1927          * requirements, or hit the bgwriter_lru_maxpages limit.
1928          */
1929
1930         /* Make sure we can handle the pin inside SyncOneBuffer */
1931         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1932
1933         num_to_scan = bufs_to_lap;
1934         num_written = 0;
1935         reusable_buffers = reusable_buffers_est;
1936
1937         /* Execute the LRU scan */
1938         while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
1939         {
1940                 int                     buffer_state = SyncOneBuffer(next_to_clean, true);
1941
1942                 if (++next_to_clean >= NBuffers)
1943                 {
1944                         next_to_clean = 0;
1945                         next_passes++;
1946                 }
1947                 num_to_scan--;
1948
1949                 if (buffer_state & BUF_WRITTEN)
1950                 {
1951                         reusable_buffers++;
1952                         if (++num_written >= bgwriter_lru_maxpages)
1953                         {
1954                                 BgWriterStats.m_maxwritten_clean++;
1955                                 break;
1956                         }
1957                 }
1958                 else if (buffer_state & BUF_REUSABLE)
1959                         reusable_buffers++;
1960         }
1961
1962         BgWriterStats.m_buf_written_clean += num_written;
1963
1964 #ifdef BGW_DEBUG
1965         elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
1966                  recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
1967                  smoothed_density, reusable_buffers_est, upcoming_alloc_est,
1968                  bufs_to_lap - num_to_scan,
1969                  num_written,
1970                  reusable_buffers - reusable_buffers_est);
1971 #endif
1972
1973         /*
1974          * Consider the above scan as being like a new allocation scan.
1975          * Characterize its density and update the smoothed one based on it. This
1976          * effectively halves the moving average period in cases where both the
1977          * strategy and the background writer are doing some useful scanning,
1978          * which is helpful because a long memory isn't as desirable on the
1979          * density estimates.
1980          */
1981         new_strategy_delta = bufs_to_lap - num_to_scan;
1982         new_recent_alloc = reusable_buffers - reusable_buffers_est;
1983         if (new_strategy_delta > 0 && new_recent_alloc > 0)
1984         {
1985                 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
1986                 smoothed_density += (scans_per_alloc - smoothed_density) /
1987                         smoothing_samples;
1988
1989 #ifdef BGW_DEBUG
1990                 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
1991                          new_recent_alloc, new_strategy_delta,
1992                          scans_per_alloc, smoothed_density);
1993 #endif
1994         }
1995
1996         /* Return true if OK to hibernate */
1997         return (bufs_to_lap == 0 && recent_alloc == 0);
1998 }
1999
2000 /*
2001  * SyncOneBuffer -- process a single buffer during syncing.
2002  *
2003  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
2004  * buffers marked recently used, as these are not replacement candidates.
2005  *
2006  * Returns a bitmask containing the following flag bits:
2007  *      BUF_WRITTEN: we wrote the buffer.
2008  *      BUF_REUSABLE: buffer is available for replacement, ie, it has
2009  *              pin count 0 and usage count 0.
2010  *
2011  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
2012  * after locking it, but we don't care all that much.)
2013  *
2014  * Note: caller must have done ResourceOwnerEnlargeBuffers.
2015  */
2016 static int
2017 SyncOneBuffer(int buf_id, bool skip_recently_used)
2018 {
2019         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
2020         int                     result = 0;
2021
2022         ReservePrivateRefCountEntry();
2023
2024         /*
2025          * Check whether buffer needs writing.
2026          *
2027          * We can make this check without taking the buffer content lock so long
2028          * as we mark pages dirty in access methods *before* logging changes with
2029          * XLogInsert(): if someone marks the buffer dirty just after our check we
2030          * don't worry because our checkpoint.redo points before log record for
2031          * upcoming changes and so we are not required to write such dirty buffer.
2032          */
2033         LockBufHdr(bufHdr);
2034
2035         if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
2036                 result |= BUF_REUSABLE;
2037         else if (skip_recently_used)
2038         {
2039                 /* Caller told us not to write recently-used buffers */
2040                 UnlockBufHdr(bufHdr);
2041                 return result;
2042         }
2043
2044         if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
2045         {
2046                 /* It's clean, so nothing to do */
2047                 UnlockBufHdr(bufHdr);
2048                 return result;
2049         }
2050
2051         /*
2052          * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
2053          * buffer is clean by the time we've locked it.)
2054          */
2055         PinBuffer_Locked(bufHdr);
2056         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2057
2058         FlushBuffer(bufHdr, NULL);
2059
2060         LWLockRelease(bufHdr->content_lock);
2061         UnpinBuffer(bufHdr, true);
2062
2063         return result | BUF_WRITTEN;
2064 }
2065
2066 /*
2067  *              AtEOXact_Buffers - clean up at end of transaction.
2068  *
2069  *              As of PostgreSQL 8.0, buffer pins should get released by the
2070  *              ResourceOwner mechanism.  This routine is just a debugging
2071  *              cross-check that no pins remain.
2072  */
2073 void
2074 AtEOXact_Buffers(bool isCommit)
2075 {
2076         CheckForBufferLeaks();
2077
2078         AtEOXact_LocalBuffers(isCommit);
2079
2080         Assert(PrivateRefCountOverflowed == 0);
2081 }
2082
2083 /*
2084  * Initialize access to shared buffer pool
2085  *
2086  * This is called during backend startup (whether standalone or under the
2087  * postmaster).  It sets up for this backend's access to the already-existing
2088  * buffer pool.
2089  *
2090  * NB: this is called before InitProcess(), so we do not have a PGPROC and
2091  * cannot do LWLockAcquire; hence we can't actually access stuff in
2092  * shared memory yet.  We are only initializing local data here.
2093  * (See also InitBufferPoolBackend)
2094  */
2095 void
2096 InitBufferPoolAccess(void)
2097 {
2098         HASHCTL         hash_ctl;
2099
2100         memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
2101
2102         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2103         hash_ctl.keysize = sizeof(int32);
2104         hash_ctl.entrysize = sizeof(PrivateRefCountArray);
2105
2106         PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
2107                                                                           HASH_ELEM | HASH_BLOBS);
2108 }
2109
2110 /*
2111  * InitBufferPoolBackend --- second-stage initialization of a new backend
2112  *
2113  * This is called after we have acquired a PGPROC and so can safely get
2114  * LWLocks.  We don't currently need to do anything at this stage ...
2115  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
2116  * access, and thereby has to be called at the corresponding phase of
2117  * backend shutdown.
2118  */
2119 void
2120 InitBufferPoolBackend(void)
2121 {
2122         on_shmem_exit(AtProcExit_Buffers, 0);
2123 }
2124
2125 /*
2126  * During backend exit, ensure that we released all shared-buffer locks and
2127  * assert that we have no remaining pins.
2128  */
2129 static void
2130 AtProcExit_Buffers(int code, Datum arg)
2131 {
2132         AbortBufferIO();
2133         UnlockBuffers();
2134
2135         CheckForBufferLeaks();
2136
2137         /* localbuf.c needs a chance too */
2138         AtProcExit_LocalBuffers();
2139 }
2140
2141 /*
2142  *              CheckForBufferLeaks - ensure this backend holds no buffer pins
2143  *
2144  *              As of PostgreSQL 8.0, buffer pins should get released by the
2145  *              ResourceOwner mechanism.  This routine is just a debugging
2146  *              cross-check that no pins remain.
2147  */
2148 static void
2149 CheckForBufferLeaks(void)
2150 {
2151 #ifdef USE_ASSERT_CHECKING
2152         int                     RefCountErrors = 0;
2153         PrivateRefCountEntry *res;
2154         int                     i;
2155
2156         /* check the array */
2157         for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
2158         {
2159                 res = &PrivateRefCountArray[i];
2160
2161                 if (res->buffer != InvalidBuffer)
2162                 {
2163                         PrintBufferLeakWarning(res->buffer);
2164                         RefCountErrors++;
2165                 }
2166         }
2167
2168         /* if neccessary search the hash */
2169         if (PrivateRefCountOverflowed)
2170         {
2171                 HASH_SEQ_STATUS hstat;
2172                 hash_seq_init(&hstat, PrivateRefCountHash);
2173                 while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
2174                 {
2175                         PrintBufferLeakWarning(res->buffer);
2176                         RefCountErrors++;
2177                 }
2178
2179         }
2180
2181         Assert(RefCountErrors == 0);
2182 #endif
2183 }
2184
2185 /*
2186  * Helper routine to issue warnings when a buffer is unexpectedly pinned
2187  */
2188 void
2189 PrintBufferLeakWarning(Buffer buffer)
2190 {
2191         volatile BufferDesc *buf;
2192         int32           loccount;
2193         char       *path;
2194         BackendId       backend;
2195
2196         Assert(BufferIsValid(buffer));
2197         if (BufferIsLocal(buffer))
2198         {
2199                 buf = &LocalBufferDescriptors[-buffer - 1];
2200                 loccount = LocalRefCount[-buffer - 1];
2201                 backend = MyBackendId;
2202         }
2203         else
2204         {
2205                 buf = &BufferDescriptors[buffer - 1];
2206                 loccount = GetPrivateRefCount(buffer);
2207                 backend = InvalidBackendId;
2208         }
2209
2210         /* theoretically we should lock the bufhdr here */
2211         path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
2212         elog(WARNING,
2213                  "buffer refcount leak: [%03d] "
2214                  "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
2215                  buffer, path,
2216                  buf->tag.blockNum, buf->flags,
2217                  buf->refcount, loccount);
2218         pfree(path);
2219 }
2220
2221 /*
2222  * CheckPointBuffers
2223  *
2224  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
2225  *
2226  * Note: temporary relations do not participate in checkpoints, so they don't
2227  * need to be flushed.
2228  */
2229 void
2230 CheckPointBuffers(int flags)
2231 {
2232         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
2233         CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
2234         BufferSync(flags);
2235         CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
2236         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
2237         smgrsync();
2238         CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
2239         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
2240 }
2241
2242
2243 /*
2244  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
2245  */
2246 void
2247 BufmgrCommit(void)
2248 {
2249         /* Nothing to do in bufmgr anymore... */
2250 }
2251
2252 /*
2253  * BufferGetBlockNumber
2254  *              Returns the block number associated with a buffer.
2255  *
2256  * Note:
2257  *              Assumes that the buffer is valid and pinned, else the
2258  *              value may be obsolete immediately...
2259  */
2260 BlockNumber
2261 BufferGetBlockNumber(Buffer buffer)
2262 {
2263         volatile BufferDesc *bufHdr;
2264
2265         Assert(BufferIsPinned(buffer));
2266
2267         if (BufferIsLocal(buffer))
2268                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
2269         else
2270                 bufHdr = &BufferDescriptors[buffer - 1];
2271
2272         /* pinned, so OK to read tag without spinlock */
2273         return bufHdr->tag.blockNum;
2274 }
2275
2276 /*
2277  * BufferGetTag
2278  *              Returns the relfilenode, fork number and block number associated with
2279  *              a buffer.
2280  */
2281 void
2282 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
2283                          BlockNumber *blknum)
2284 {
2285         volatile BufferDesc *bufHdr;
2286
2287         /* Do the same checks as BufferGetBlockNumber. */
2288         Assert(BufferIsPinned(buffer));
2289
2290         if (BufferIsLocal(buffer))
2291                 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
2292         else
2293                 bufHdr = &BufferDescriptors[buffer - 1];
2294
2295         /* pinned, so OK to read tag without spinlock */
2296         *rnode = bufHdr->tag.rnode;
2297         *forknum = bufHdr->tag.forkNum;
2298         *blknum = bufHdr->tag.blockNum;
2299 }
2300
2301 /*
2302  * FlushBuffer
2303  *              Physically write out a shared buffer.
2304  *
2305  * NOTE: this actually just passes the buffer contents to the kernel; the
2306  * real write to disk won't happen until the kernel feels like it.  This
2307  * is okay from our point of view since we can redo the changes from WAL.
2308  * However, we will need to force the changes to disk via fsync before
2309  * we can checkpoint WAL.
2310  *
2311  * The caller must hold a pin on the buffer and have share-locked the
2312  * buffer contents.  (Note: a share-lock does not prevent updates of
2313  * hint bits in the buffer, so the page could change while the write
2314  * is in progress, but we assume that that will not invalidate the data
2315  * written.)
2316  *
2317  * If the caller has an smgr reference for the buffer's relation, pass it
2318  * as the second parameter.  If not, pass NULL.
2319  */
2320 static void
2321 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
2322 {
2323         XLogRecPtr      recptr;
2324         ErrorContextCallback errcallback;
2325         instr_time      io_start,
2326                                 io_time;
2327         Block           bufBlock;
2328         char       *bufToWrite;
2329
2330         /*
2331          * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
2332          * false, then someone else flushed the buffer before we could, so we need
2333          * not do anything.
2334          */
2335         if (!StartBufferIO(buf, false))
2336                 return;
2337
2338         /* Setup error traceback support for ereport() */
2339         errcallback.callback = shared_buffer_write_error_callback;
2340         errcallback.arg = (void *) buf;
2341         errcallback.previous = error_context_stack;
2342         error_context_stack = &errcallback;
2343
2344         /* Find smgr relation for buffer */
2345         if (reln == NULL)
2346                 reln = smgropen(buf->tag.rnode, InvalidBackendId);
2347
2348         TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
2349                                                                                 buf->tag.blockNum,
2350                                                                                 reln->smgr_rnode.node.spcNode,
2351                                                                                 reln->smgr_rnode.node.dbNode,
2352                                                                                 reln->smgr_rnode.node.relNode);
2353
2354         LockBufHdr(buf);
2355
2356         /*
2357          * Run PageGetLSN while holding header lock, since we don't have the
2358          * buffer locked exclusively in all cases.
2359          */
2360         recptr = BufferGetLSN(buf);
2361
2362         /* To check if block content changes while flushing. - vadim 01/17/97 */
2363         buf->flags &= ~BM_JUST_DIRTIED;
2364         UnlockBufHdr(buf);
2365
2366         /*
2367          * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
2368          * rule that log updates must hit disk before any of the data-file changes
2369          * they describe do.
2370          *
2371          * However, this rule does not apply to unlogged relations, which will be
2372          * lost after a crash anyway.  Most unlogged relation pages do not bear
2373          * LSNs since we never emit WAL records for them, and therefore flushing
2374          * up through the buffer LSN would be useless, but harmless.  However,
2375          * GiST indexes use LSNs internally to track page-splits, and therefore
2376          * unlogged GiST pages bear "fake" LSNs generated by
2377          * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
2378          * LSN counter could advance past the WAL insertion point; and if it did
2379          * happen, attempting to flush WAL through that location would fail, with
2380          * disastrous system-wide consequences.  To make sure that can't happen,
2381          * skip the flush if the buffer isn't permanent.
2382          */
2383         if (buf->flags & BM_PERMANENT)
2384                 XLogFlush(recptr);
2385
2386         /*
2387          * Now it's safe to write buffer to disk. Note that no one else should
2388          * have been able to write it while we were busy with log flushing because
2389          * we have the io_in_progress lock.
2390          */
2391         bufBlock = BufHdrGetBlock(buf);
2392
2393         /*
2394          * Update page checksum if desired.  Since we have only shared lock on the
2395          * buffer, other processes might be updating hint bits in it, so we must
2396          * copy the page to private storage if we do checksumming.
2397          */
2398         bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
2399
2400         if (track_io_timing)
2401                 INSTR_TIME_SET_CURRENT(io_start);
2402
2403         /*
2404          * bufToWrite is either the shared buffer or a copy, as appropriate.
2405          */
2406         smgrwrite(reln,
2407                           buf->tag.forkNum,
2408                           buf->tag.blockNum,
2409                           bufToWrite,
2410                           false);
2411
2412         if (track_io_timing)
2413         {
2414                 INSTR_TIME_SET_CURRENT(io_time);
2415                 INSTR_TIME_SUBTRACT(io_time, io_start);
2416                 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2417                 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2418         }
2419
2420         pgBufferUsage.shared_blks_written++;
2421
2422         /*
2423          * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2424          * end the io_in_progress state.
2425          */
2426         TerminateBufferIO(buf, true, 0);
2427
2428         TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2429                                                                            buf->tag.blockNum,
2430                                                                            reln->smgr_rnode.node.spcNode,
2431                                                                            reln->smgr_rnode.node.dbNode,
2432                                                                            reln->smgr_rnode.node.relNode);
2433
2434         /* Pop the error context stack */
2435         error_context_stack = errcallback.previous;
2436 }
2437
2438 /*
2439  * RelationGetNumberOfBlocksInFork
2440  *              Determines the current number of pages in the specified relation fork.
2441  */
2442 BlockNumber
2443 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2444 {
2445         /* Open it at the smgr level if not already done */
2446         RelationOpenSmgr(relation);
2447
2448         return smgrnblocks(relation->rd_smgr, forkNum);
2449 }
2450
2451 /*
2452  * BufferIsPermanent
2453  *              Determines whether a buffer will potentially still be around after
2454  *              a crash.  Caller must hold a buffer pin.
2455  */
2456 bool
2457 BufferIsPermanent(Buffer buffer)
2458 {
2459         volatile BufferDesc *bufHdr;
2460
2461         /* Local buffers are used only for temp relations. */
2462         if (BufferIsLocal(buffer))
2463                 return false;
2464
2465         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2466         Assert(BufferIsValid(buffer));
2467         Assert(BufferIsPinned(buffer));
2468
2469         /*
2470          * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2471          * need not bother with the buffer header spinlock.  Even if someone else
2472          * changes the buffer header flags while we're doing this, we assume that
2473          * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
2474          * old value or the new value, but not random garbage.
2475          */
2476         bufHdr = &BufferDescriptors[buffer - 1];
2477         return (bufHdr->flags & BM_PERMANENT) != 0;
2478 }
2479
2480 /*
2481  * BufferGetLSNAtomic
2482  *              Retrieves the LSN of the buffer atomically using a buffer header lock.
2483  *              This is necessary for some callers who may not have an exclusive lock
2484  *              on the buffer.
2485  */
2486 XLogRecPtr
2487 BufferGetLSNAtomic(Buffer buffer)
2488 {
2489         volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
2490         char       *page = BufferGetPage(buffer);
2491         XLogRecPtr      lsn;
2492
2493         /*
2494          * If we don't need locking for correctness, fastpath out.
2495          */
2496         if (!DataChecksumsEnabled() || BufferIsLocal(buffer))
2497                 return PageGetLSN(page);
2498
2499         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2500         Assert(BufferIsValid(buffer));
2501         Assert(BufferIsPinned(buffer));
2502
2503         LockBufHdr(bufHdr);
2504         lsn = PageGetLSN(page);
2505         UnlockBufHdr(bufHdr);
2506
2507         return lsn;
2508 }
2509
2510 /* ---------------------------------------------------------------------
2511  *              DropRelFileNodeBuffers
2512  *
2513  *              This function removes from the buffer pool all the pages of the
2514  *              specified relation fork that have block numbers >= firstDelBlock.
2515  *              (In particular, with firstDelBlock = 0, all pages are removed.)
2516  *              Dirty pages are simply dropped, without bothering to write them
2517  *              out first.  Therefore, this is NOT rollback-able, and so should be
2518  *              used only with extreme caution!
2519  *
2520  *              Currently, this is called only from smgr.c when the underlying file
2521  *              is about to be deleted or truncated (firstDelBlock is needed for
2522  *              the truncation case).  The data in the affected pages would therefore
2523  *              be deleted momentarily anyway, and there is no point in writing it.
2524  *              It is the responsibility of higher-level code to ensure that the
2525  *              deletion or truncation does not lose any data that could be needed
2526  *              later.  It is also the responsibility of higher-level code to ensure
2527  *              that no other process could be trying to load more pages of the
2528  *              relation into buffers.
2529  *
2530  *              XXX currently it sequentially searches the buffer pool, should be
2531  *              changed to more clever ways of searching.  However, this routine
2532  *              is used only in code paths that aren't very performance-critical,
2533  *              and we shouldn't slow down the hot paths to make it faster ...
2534  * --------------------------------------------------------------------
2535  */
2536 void
2537 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2538                                            BlockNumber firstDelBlock)
2539 {
2540         int                     i;
2541
2542         /* If it's a local relation, it's localbuf.c's problem. */
2543         if (RelFileNodeBackendIsTemp(rnode))
2544         {
2545                 if (rnode.backend == MyBackendId)
2546                         DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2547                 return;
2548         }
2549
2550         for (i = 0; i < NBuffers; i++)
2551         {
2552                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2553
2554                 /*
2555                  * We can make this a tad faster by prechecking the buffer tag before
2556                  * we attempt to lock the buffer; this saves a lot of lock
2557                  * acquisitions in typical cases.  It should be safe because the
2558                  * caller must have AccessExclusiveLock on the relation, or some other
2559                  * reason to be certain that no one is loading new pages of the rel
2560                  * into the buffer pool.  (Otherwise we might well miss such pages
2561                  * entirely.)  Therefore, while the tag might be changing while we
2562                  * look at it, it can't be changing *to* a value we care about, only
2563                  * *away* from such a value.  So false negatives are impossible, and
2564                  * false positives are safe because we'll recheck after getting the
2565                  * buffer lock.
2566                  *
2567                  * We could check forkNum and blockNum as well as the rnode, but the
2568                  * incremental win from doing so seems small.
2569                  */
2570                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2571                         continue;
2572
2573                 LockBufHdr(bufHdr);
2574                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2575                         bufHdr->tag.forkNum == forkNum &&
2576                         bufHdr->tag.blockNum >= firstDelBlock)
2577                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2578                 else
2579                         UnlockBufHdr(bufHdr);
2580         }
2581 }
2582
2583 /* ---------------------------------------------------------------------
2584  *              DropRelFileNodesAllBuffers
2585  *
2586  *              This function removes from the buffer pool all the pages of all
2587  *              forks of the specified relations.  It's equivalent to calling
2588  *              DropRelFileNodeBuffers once per fork per relation with
2589  *              firstDelBlock = 0.
2590  * --------------------------------------------------------------------
2591  */
2592 void
2593 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2594 {
2595         int                     i,
2596                                 n = 0;
2597         RelFileNode *nodes;
2598         bool            use_bsearch;
2599
2600         if (nnodes == 0)
2601                 return;
2602
2603         nodes = palloc(sizeof(RelFileNode) * nnodes);           /* non-local relations */
2604
2605         /* If it's a local relation, it's localbuf.c's problem. */
2606         for (i = 0; i < nnodes; i++)
2607         {
2608                 if (RelFileNodeBackendIsTemp(rnodes[i]))
2609                 {
2610                         if (rnodes[i].backend == MyBackendId)
2611                                 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2612                 }
2613                 else
2614                         nodes[n++] = rnodes[i].node;
2615         }
2616
2617         /*
2618          * If there are no non-local relations, then we're done. Release the
2619          * memory and return.
2620          */
2621         if (n == 0)
2622         {
2623                 pfree(nodes);
2624                 return;
2625         }
2626
2627         /*
2628          * For low number of relations to drop just use a simple walk through, to
2629          * save the bsearch overhead. The threshold to use is rather a guess than
2630          * an exactly determined value, as it depends on many factors (CPU and RAM
2631          * speeds, amount of shared buffers etc.).
2632          */
2633         use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2634
2635         /* sort the list of rnodes if necessary */
2636         if (use_bsearch)
2637                 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2638
2639         for (i = 0; i < NBuffers; i++)
2640         {
2641                 RelFileNode *rnode = NULL;
2642                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2643
2644                 /*
2645                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2646                  * and saves some cycles.
2647                  */
2648
2649                 if (!use_bsearch)
2650                 {
2651                         int                     j;
2652
2653                         for (j = 0; j < n; j++)
2654                         {
2655                                 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
2656                                 {
2657                                         rnode = &nodes[j];
2658                                         break;
2659                                 }
2660                         }
2661                 }
2662                 else
2663                 {
2664                         rnode = bsearch((const void *) &(bufHdr->tag.rnode),
2665                                                         nodes, n, sizeof(RelFileNode),
2666                                                         rnode_comparator);
2667                 }
2668
2669                 /* buffer doesn't belong to any of the given relfilenodes; skip it */
2670                 if (rnode == NULL)
2671                         continue;
2672
2673                 LockBufHdr(bufHdr);
2674                 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
2675                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2676                 else
2677                         UnlockBufHdr(bufHdr);
2678         }
2679
2680         pfree(nodes);
2681 }
2682
2683 /* ---------------------------------------------------------------------
2684  *              DropDatabaseBuffers
2685  *
2686  *              This function removes all the buffers in the buffer cache for a
2687  *              particular database.  Dirty pages are simply dropped, without
2688  *              bothering to write them out first.  This is used when we destroy a
2689  *              database, to avoid trying to flush data to disk when the directory
2690  *              tree no longer exists.  Implementation is pretty similar to
2691  *              DropRelFileNodeBuffers() which is for destroying just one relation.
2692  * --------------------------------------------------------------------
2693  */
2694 void
2695 DropDatabaseBuffers(Oid dbid)
2696 {
2697         int                     i;
2698
2699         /*
2700          * We needn't consider local buffers, since by assumption the target
2701          * database isn't our own.
2702          */
2703
2704         for (i = 0; i < NBuffers; i++)
2705         {
2706                 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
2707
2708                 /*
2709                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2710                  * and saves some cycles.
2711                  */
2712                 if (bufHdr->tag.rnode.dbNode != dbid)
2713                         continue;
2714
2715                 LockBufHdr(bufHdr);
2716                 if (bufHdr->tag.rnode.dbNode == dbid)
2717                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2718                 else
2719                         UnlockBufHdr(bufHdr);
2720         }
2721 }
2722
2723 /* -----------------------------------------------------------------
2724  *              PrintBufferDescs
2725  *
2726  *              this function prints all the buffer descriptors, for debugging
2727  *              use only.
2728  * -----------------------------------------------------------------
2729  */
2730 #ifdef NOT_USED
2731 void
2732 PrintBufferDescs(void)
2733 {
2734         int                     i;
2735         volatile BufferDesc *buf = BufferDescriptors;
2736
2737         for (i = 0; i < NBuffers; ++i, ++buf)
2738         {
2739                 /* theoretically we should lock the bufhdr here */
2740                 elog(LOG,
2741                          "[%02d] (freeNext=%d, rel=%s, "
2742                          "blockNum=%u, flags=0x%x, refcount=%u %d)",
2743                          i, buf->freeNext,
2744                   relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
2745                          buf->tag.blockNum, buf->flags,
2746                          buf->refcount, GetPrivateRefCount(i));
2747         }
2748 }
2749 #endif
2750
2751 #ifdef NOT_USED
2752 void
2753 PrintPinnedBufs(void)
2754 {
2755         int                     i;
2756         volatile BufferDesc *buf = BufferDescriptors;
2757
2758         for (i = 0; i < NBuffers; ++i, ++buf)
2759         {
2760                 if (GetPrivateRefCount(i + 1) > 0)
2761                 {
2762                         /* theoretically we should lock the bufhdr here */
2763                         elog(LOG,
2764                                  "[%02d] (freeNext=%d, rel=%s, "
2765                                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
2766                                  i, buf->freeNext,
2767                                  relpath(buf->tag.rnode, buf->tag.forkNum),
2768                                  buf->tag.blockNum, buf->flags,
2769                                  buf->refcount, GetPrivateRefCount(i + 1));
2770                 }
2771         }
2772 }
2773 #endif
2774
2775 /* ---------------------------------------------------------------------
2776  *              FlushRelationBuffers
2777  *
2778  *              This function writes all dirty pages of a relation out to disk
2779  *              (or more accurately, out to kernel disk buffers), ensuring that the
2780  *              kernel has an up-to-date view of the relation.
2781  *
2782  *              Generally, the caller should be holding AccessExclusiveLock on the
2783  *              target relation to ensure that no other backend is busy dirtying
2784  *              more blocks of the relation; the effects can't be expected to last
2785  *              after the lock is released.
2786  *
2787  *              XXX currently it sequentially searches the buffer pool, should be
2788  *              changed to more clever ways of searching.  This routine is not
2789  *              used in any performance-critical code paths, so it's not worth
2790  *              adding additional overhead to normal paths to make it go faster;
2791  *              but see also DropRelFileNodeBuffers.
2792  * --------------------------------------------------------------------
2793  */
2794 void
2795 FlushRelationBuffers(Relation rel)
2796 {
2797         int                     i;
2798         volatile BufferDesc *bufHdr;
2799
2800         /* Open rel at the smgr level if not already done */
2801         RelationOpenSmgr(rel);
2802
2803         if (RelationUsesLocalBuffers(rel))
2804         {
2805                 for (i = 0; i < NLocBuffer; i++)
2806                 {
2807                         bufHdr = &LocalBufferDescriptors[i];
2808                         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2809                                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2810                         {
2811                                 ErrorContextCallback errcallback;
2812                                 Page            localpage;
2813
2814                                 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
2815
2816                                 /* Setup error traceback support for ereport() */
2817                                 errcallback.callback = local_buffer_write_error_callback;
2818                                 errcallback.arg = (void *) bufHdr;
2819                                 errcallback.previous = error_context_stack;
2820                                 error_context_stack = &errcallback;
2821
2822                                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
2823
2824                                 smgrwrite(rel->rd_smgr,
2825                                                   bufHdr->tag.forkNum,
2826                                                   bufHdr->tag.blockNum,
2827                                                   localpage,
2828                                                   false);
2829
2830                                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
2831
2832                                 /* Pop the error context stack */
2833                                 error_context_stack = errcallback.previous;
2834                         }
2835                 }
2836
2837                 return;
2838         }
2839
2840         /* Make sure we can handle the pin inside the loop */
2841         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2842
2843         for (i = 0; i < NBuffers; i++)
2844         {
2845                 bufHdr = &BufferDescriptors[i];
2846
2847                 /*
2848                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2849                  * and saves some cycles.
2850                  */
2851                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
2852                         continue;
2853
2854                 ReservePrivateRefCountEntry();
2855
2856                 LockBufHdr(bufHdr);
2857                 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2858                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2859                 {
2860                         PinBuffer_Locked(bufHdr);
2861                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2862                         FlushBuffer(bufHdr, rel->rd_smgr);
2863                         LWLockRelease(bufHdr->content_lock);
2864                         UnpinBuffer(bufHdr, true);
2865                 }
2866                 else
2867                         UnlockBufHdr(bufHdr);
2868         }
2869 }
2870
2871 /* ---------------------------------------------------------------------
2872  *              FlushDatabaseBuffers
2873  *
2874  *              This function writes all dirty pages of a database out to disk
2875  *              (or more accurately, out to kernel disk buffers), ensuring that the
2876  *              kernel has an up-to-date view of the database.
2877  *
2878  *              Generally, the caller should be holding an appropriate lock to ensure
2879  *              no other backend is active in the target database; otherwise more
2880  *              pages could get dirtied.
2881  *
2882  *              Note we don't worry about flushing any pages of temporary relations.
2883  *              It's assumed these wouldn't be interesting.
2884  * --------------------------------------------------------------------
2885  */
2886 void
2887 FlushDatabaseBuffers(Oid dbid)
2888 {
2889         int                     i;
2890         volatile BufferDesc *bufHdr;
2891
2892         /* Make sure we can handle the pin inside the loop */
2893         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2894
2895         for (i = 0; i < NBuffers; i++)
2896         {
2897                 bufHdr = &BufferDescriptors[i];
2898
2899                 /*
2900                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2901                  * and saves some cycles.
2902                  */
2903                 if (bufHdr->tag.rnode.dbNode != dbid)
2904                         continue;
2905
2906                 ReservePrivateRefCountEntry();
2907
2908                 LockBufHdr(bufHdr);
2909                 if (bufHdr->tag.rnode.dbNode == dbid &&
2910                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2911                 {
2912                         PinBuffer_Locked(bufHdr);
2913                         LWLockAcquire(bufHdr->content_lock, LW_SHARED);
2914                         FlushBuffer(bufHdr, NULL);
2915                         LWLockRelease(bufHdr->content_lock);
2916                         UnpinBuffer(bufHdr, true);
2917                 }
2918                 else
2919                         UnlockBufHdr(bufHdr);
2920         }
2921 }
2922
2923 /*
2924  * ReleaseBuffer -- release the pin on a buffer
2925  */
2926 void
2927 ReleaseBuffer(Buffer buffer)
2928 {
2929         if (!BufferIsValid(buffer))
2930                 elog(ERROR, "bad buffer ID: %d", buffer);
2931
2932         if (BufferIsLocal(buffer))
2933         {
2934                 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
2935
2936                 Assert(LocalRefCount[-buffer - 1] > 0);
2937                 LocalRefCount[-buffer - 1]--;
2938                 return;
2939         }
2940
2941         UnpinBuffer(&BufferDescriptors[buffer - 1], true);
2942 }
2943
2944 /*
2945  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
2946  *
2947  * This is just a shorthand for a common combination.
2948  */
2949 void
2950 UnlockReleaseBuffer(Buffer buffer)
2951 {
2952         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2953         ReleaseBuffer(buffer);
2954 }
2955
2956 /*
2957  * IncrBufferRefCount
2958  *              Increment the pin count on a buffer that we have *already* pinned
2959  *              at least once.
2960  *
2961  *              This function cannot be used on a buffer we do not have pinned,
2962  *              because it doesn't change the shared buffer state.
2963  */
2964 void
2965 IncrBufferRefCount(Buffer buffer)
2966 {
2967         Assert(BufferIsPinned(buffer));
2968         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2969         ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
2970         if (BufferIsLocal(buffer))
2971                 LocalRefCount[-buffer - 1]++;
2972         else
2973         {
2974                 PrivateRefCountEntry *ref;
2975                 ref = GetPrivateRefCountEntry(buffer, true);
2976                 Assert(ref != NULL);
2977                 ref->refcount++;
2978         }
2979 }
2980
2981 /*
2982  * MarkBufferDirtyHint
2983  *
2984  *      Mark a buffer dirty for non-critical changes.
2985  *
2986  * This is essentially the same as MarkBufferDirty, except:
2987  *
2988  * 1. The caller does not write WAL; so if checksums are enabled, we may need
2989  *        to write an XLOG_HINT WAL record to protect against torn pages.
2990  * 2. The caller might have only share-lock instead of exclusive-lock on the
2991  *        buffer's content lock.
2992  * 3. This function does not guarantee that the buffer is always marked dirty
2993  *        (due to a race condition), so it cannot be used for important changes.
2994  */
2995 void
2996 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
2997 {
2998         volatile BufferDesc *bufHdr;
2999         Page            page = BufferGetPage(buffer);
3000
3001         if (!BufferIsValid(buffer))
3002                 elog(ERROR, "bad buffer ID: %d", buffer);
3003
3004         if (BufferIsLocal(buffer))
3005         {
3006                 MarkLocalBufferDirty(buffer);
3007                 return;
3008         }
3009
3010         bufHdr = &BufferDescriptors[buffer - 1];
3011
3012         Assert(GetPrivateRefCount(buffer) > 0);
3013         /* here, either share or exclusive lock is OK */
3014         Assert(LWLockHeldByMe(bufHdr->content_lock));
3015
3016         /*
3017          * This routine might get called many times on the same page, if we are
3018          * making the first scan after commit of an xact that added/deleted many
3019          * tuples. So, be as quick as we can if the buffer is already dirty.  We
3020          * do this by not acquiring spinlock if it looks like the status bits are
3021          * already set.  Since we make this test unlocked, there's a chance we
3022          * might fail to notice that the flags have just been cleared, and failed
3023          * to reset them, due to memory-ordering issues.  But since this function
3024          * is only intended to be used in cases where failing to write out the
3025          * data would be harmless anyway, it doesn't really matter.
3026          */
3027         if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
3028                 (BM_DIRTY | BM_JUST_DIRTIED))
3029         {
3030                 XLogRecPtr      lsn = InvalidXLogRecPtr;
3031                 bool            dirtied = false;
3032                 bool            delayChkpt = false;
3033
3034                 /*
3035                  * If we need to protect hint bit updates from torn writes, WAL-log a
3036                  * full page image of the page. This full page image is only necessary
3037                  * if the hint bit update is the first change to the page since the
3038                  * last checkpoint.
3039                  *
3040                  * We don't check full_page_writes here because that logic is included
3041                  * when we call XLogInsert() since the value changes dynamically.
3042                  */
3043                 if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
3044                 {
3045                         /*
3046                          * If we're in recovery we cannot dirty a page because of a hint.
3047                          * We can set the hint, just not dirty the page as a result so the
3048                          * hint is lost when we evict the page or shutdown.
3049                          *
3050                          * See src/backend/storage/page/README for longer discussion.
3051                          */
3052                         if (RecoveryInProgress())
3053                                 return;
3054
3055                         /*
3056                          * If the block is already dirty because we either made a change
3057                          * or set a hint already, then we don't need to write a full page
3058                          * image.  Note that aggressive cleaning of blocks dirtied by hint
3059                          * bit setting would increase the call rate. Bulk setting of hint
3060                          * bits would reduce the call rate...
3061                          *
3062                          * We must issue the WAL record before we mark the buffer dirty.
3063                          * Otherwise we might write the page before we write the WAL. That
3064                          * causes a race condition, since a checkpoint might occur between
3065                          * writing the WAL record and marking the buffer dirty. We solve
3066                          * that with a kluge, but one that is already in use during
3067                          * transaction commit to prevent race conditions. Basically, we
3068                          * simply prevent the checkpoint WAL record from being written
3069                          * until we have marked the buffer dirty. We don't start the
3070                          * checkpoint flush until we have marked dirty, so our checkpoint
3071                          * must flush the change to disk successfully or the checkpoint
3072                          * never gets written, so crash recovery will fix.
3073                          *
3074                          * It's possible we may enter here without an xid, so it is
3075                          * essential that CreateCheckpoint waits for virtual transactions
3076                          * rather than full transactionids.
3077                          */
3078                         MyPgXact->delayChkpt = delayChkpt = true;
3079                         lsn = XLogSaveBufferForHint(buffer, buffer_std);
3080                 }
3081
3082                 LockBufHdr(bufHdr);
3083                 Assert(bufHdr->refcount > 0);
3084                 if (!(bufHdr->flags & BM_DIRTY))
3085                 {
3086                         dirtied = true;         /* Means "will be dirtied by this action" */
3087
3088                         /*
3089                          * Set the page LSN if we wrote a backup block. We aren't supposed
3090                          * to set this when only holding a share lock but as long as we
3091                          * serialise it somehow we're OK. We choose to set LSN while
3092                          * holding the buffer header lock, which causes any reader of an
3093                          * LSN who holds only a share lock to also obtain a buffer header
3094                          * lock before using PageGetLSN(), which is enforced in
3095                          * BufferGetLSNAtomic().
3096                          *
3097                          * If checksums are enabled, you might think we should reset the
3098                          * checksum here. That will happen when the page is written
3099                          * sometime later in this checkpoint cycle.
3100                          */
3101                         if (!XLogRecPtrIsInvalid(lsn))
3102                                 PageSetLSN(page, lsn);
3103                 }
3104                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
3105                 UnlockBufHdr(bufHdr);
3106
3107                 if (delayChkpt)
3108                         MyPgXact->delayChkpt = false;
3109
3110                 if (dirtied)
3111                 {
3112                         VacuumPageDirty++;
3113                         pgBufferUsage.shared_blks_dirtied++;
3114                         if (VacuumCostActive)
3115                                 VacuumCostBalance += VacuumCostPageDirty;
3116                 }
3117         }
3118 }
3119
3120 /*
3121  * Release buffer content locks for shared buffers.
3122  *
3123  * Used to clean up after errors.
3124  *
3125  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
3126  * of releasing buffer content locks per se; the only thing we need to deal
3127  * with here is clearing any PIN_COUNT request that was in progress.
3128  */
3129 void
3130 UnlockBuffers(void)
3131 {
3132         volatile BufferDesc *buf = PinCountWaitBuf;
3133
3134         if (buf)
3135         {
3136                 LockBufHdr(buf);
3137
3138                 /*
3139                  * Don't complain if flag bit not set; it could have been reset but we
3140                  * got a cancel/die interrupt before getting the signal.
3141                  */
3142                 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
3143                         buf->wait_backend_pid == MyProcPid)
3144                         buf->flags &= ~BM_PIN_COUNT_WAITER;
3145
3146                 UnlockBufHdr(buf);
3147
3148                 PinCountWaitBuf = NULL;
3149         }
3150 }
3151
3152 /*
3153  * Acquire or release the content_lock for the buffer.
3154  */
3155 void
3156 LockBuffer(Buffer buffer, int mode)
3157 {
3158         volatile BufferDesc *buf;
3159
3160         Assert(BufferIsValid(buffer));
3161         if (BufferIsLocal(buffer))
3162                 return;                                 /* local buffers need no lock */
3163
3164         buf = &(BufferDescriptors[buffer - 1]);
3165
3166         if (mode == BUFFER_LOCK_UNLOCK)
3167                 LWLockRelease(buf->content_lock);
3168         else if (mode == BUFFER_LOCK_SHARE)
3169                 LWLockAcquire(buf->content_lock, LW_SHARED);
3170         else if (mode == BUFFER_LOCK_EXCLUSIVE)
3171                 LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
3172         else
3173                 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
3174 }
3175
3176 /*
3177  * Acquire the content_lock for the buffer, but only if we don't have to wait.
3178  *
3179  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
3180  */
3181 bool
3182 ConditionalLockBuffer(Buffer buffer)
3183 {
3184         volatile BufferDesc *buf;
3185
3186         Assert(BufferIsValid(buffer));
3187         if (BufferIsLocal(buffer))
3188                 return true;                    /* act as though we got it */
3189
3190         buf = &(BufferDescriptors[buffer - 1]);
3191
3192         return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
3193 }
3194
3195 /*
3196  * LockBufferForCleanup - lock a buffer in preparation for deleting items
3197  *
3198  * Items may be deleted from a disk page only when the caller (a) holds an
3199  * exclusive lock on the buffer and (b) has observed that no other backend
3200  * holds a pin on the buffer.  If there is a pin, then the other backend
3201  * might have a pointer into the buffer (for example, a heapscan reference
3202  * to an item --- see README for more details).  It's OK if a pin is added
3203  * after the cleanup starts, however; the newly-arrived backend will be
3204  * unable to look at the page until we release the exclusive lock.
3205  *
3206  * To implement this protocol, a would-be deleter must pin the buffer and
3207  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
3208  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
3209  * it has successfully observed pin count = 1.
3210  */
3211 void
3212 LockBufferForCleanup(Buffer buffer)
3213 {
3214         volatile BufferDesc *bufHdr;
3215
3216         Assert(BufferIsValid(buffer));
3217         Assert(PinCountWaitBuf == NULL);
3218
3219         if (BufferIsLocal(buffer))
3220         {
3221                 /* There should be exactly one pin */
3222                 if (LocalRefCount[-buffer - 1] != 1)
3223                         elog(ERROR, "incorrect local pin count: %d",
3224                                  LocalRefCount[-buffer - 1]);
3225                 /* Nobody else to wait for */
3226                 return;
3227         }
3228
3229         /* There should be exactly one local pin */
3230         if (GetPrivateRefCount(buffer) != 1)
3231                 elog(ERROR, "incorrect local pin count: %d",
3232                          GetPrivateRefCount(buffer));
3233
3234         bufHdr = &BufferDescriptors[buffer - 1];
3235
3236         for (;;)
3237         {
3238                 /* Try to acquire lock */
3239                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3240                 LockBufHdr(bufHdr);
3241                 Assert(bufHdr->refcount > 0);
3242                 if (bufHdr->refcount == 1)
3243                 {
3244                         /* Successfully acquired exclusive lock with pincount 1 */
3245                         UnlockBufHdr(bufHdr);
3246                         return;
3247                 }
3248                 /* Failed, so mark myself as waiting for pincount 1 */
3249                 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
3250                 {
3251                         UnlockBufHdr(bufHdr);
3252                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3253                         elog(ERROR, "multiple backends attempting to wait for pincount 1");
3254                 }
3255                 bufHdr->wait_backend_pid = MyProcPid;
3256                 bufHdr->flags |= BM_PIN_COUNT_WAITER;
3257                 PinCountWaitBuf = bufHdr;
3258                 UnlockBufHdr(bufHdr);
3259                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3260
3261                 /* Wait to be signaled by UnpinBuffer() */
3262                 if (InHotStandby)
3263                 {
3264                         /* Publish the bufid that Startup process waits on */
3265                         SetStartupBufferPinWaitBufId(buffer - 1);
3266                         /* Set alarm and then wait to be signaled by UnpinBuffer() */
3267                         ResolveRecoveryConflictWithBufferPin();
3268                         /* Reset the published bufid */
3269                         SetStartupBufferPinWaitBufId(-1);
3270                 }
3271                 else
3272                         ProcWaitForSignal();
3273
3274                 PinCountWaitBuf = NULL;
3275                 /* Loop back and try again */
3276         }
3277 }
3278
3279 /*
3280  * Check called from RecoveryConflictInterrupt handler when Startup
3281  * process requests cancellation of all pin holders that are blocking it.
3282  */
3283 bool
3284 HoldingBufferPinThatDelaysRecovery(void)
3285 {
3286         int                     bufid = GetStartupBufferPinWaitBufId();
3287
3288         /*
3289          * If we get woken slowly then it's possible that the Startup process was
3290          * already woken by other backends before we got here. Also possible that
3291          * we get here by multiple interrupts or interrupts at inappropriate
3292          * times, so make sure we do nothing if the bufid is not set.
3293          */
3294         if (bufid < 0)
3295                 return false;
3296
3297         if (GetPrivateRefCount(bufid + 1) > 0)
3298                 return true;
3299
3300         return false;
3301 }
3302
3303 /*
3304  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
3305  *
3306  * We won't loop, but just check once to see if the pin count is OK.  If
3307  * not, return FALSE with no lock held.
3308  */
3309 bool
3310 ConditionalLockBufferForCleanup(Buffer buffer)
3311 {
3312         volatile BufferDesc *bufHdr;
3313
3314         Assert(BufferIsValid(buffer));
3315
3316         if (BufferIsLocal(buffer))
3317         {
3318                 /* There should be exactly one pin */
3319                 Assert(LocalRefCount[-buffer - 1] > 0);
3320                 if (LocalRefCount[-buffer - 1] != 1)
3321                         return false;
3322                 /* Nobody else to wait for */
3323                 return true;
3324         }
3325
3326         /* There should be exactly one local pin */
3327         Assert(GetPrivateRefCount(buffer) > 0);
3328         if (GetPrivateRefCount(buffer) != 1)
3329                 return false;
3330
3331         /* Try to acquire lock */
3332         if (!ConditionalLockBuffer(buffer))
3333                 return false;
3334
3335         bufHdr = &BufferDescriptors[buffer - 1];
3336         LockBufHdr(bufHdr);
3337         Assert(bufHdr->refcount > 0);
3338         if (bufHdr->refcount == 1)
3339         {
3340                 /* Successfully acquired exclusive lock with pincount 1 */
3341                 UnlockBufHdr(bufHdr);
3342                 return true;
3343         }
3344
3345         /* Failed, so release the lock */
3346         UnlockBufHdr(bufHdr);
3347         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3348         return false;
3349 }
3350
3351
3352 /*
3353  *      Functions for buffer I/O handling
3354  *
3355  *      Note: We assume that nested buffer I/O never occurs.
3356  *      i.e at most one io_in_progress lock is held per proc.
3357  *
3358  *      Also note that these are used only for shared buffers, not local ones.
3359  */
3360
3361 /*
3362  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
3363  */
3364 static void
3365 WaitIO(volatile BufferDesc *buf)
3366 {
3367         /*
3368          * Changed to wait until there's no IO - Inoue 01/13/2000
3369          *
3370          * Note this is *necessary* because an error abort in the process doing
3371          * I/O could release the io_in_progress_lock prematurely. See
3372          * AbortBufferIO.
3373          */
3374         for (;;)
3375         {
3376                 BufFlags        sv_flags;
3377
3378                 /*
3379                  * It may not be necessary to acquire the spinlock to check the flag
3380                  * here, but since this test is essential for correctness, we'd better
3381                  * play it safe.
3382                  */
3383                 LockBufHdr(buf);
3384                 sv_flags = buf->flags;
3385                 UnlockBufHdr(buf);
3386                 if (!(sv_flags & BM_IO_IN_PROGRESS))
3387                         break;
3388                 LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
3389                 LWLockRelease(buf->io_in_progress_lock);
3390         }
3391 }
3392
3393 /*
3394  * StartBufferIO: begin I/O on this buffer
3395  *      (Assumptions)
3396  *      My process is executing no IO
3397  *      The buffer is Pinned
3398  *
3399  * In some scenarios there are race conditions in which multiple backends
3400  * could attempt the same I/O operation concurrently.  If someone else
3401  * has already started I/O on this buffer then we will block on the
3402  * io_in_progress lock until he's done.
3403  *
3404  * Input operations are only attempted on buffers that are not BM_VALID,
3405  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
3406  * so we can always tell if the work is already done.
3407  *
3408  * Returns TRUE if we successfully marked the buffer as I/O busy,
3409  * FALSE if someone else already did the work.
3410  */
3411 static bool
3412 StartBufferIO(volatile BufferDesc *buf, bool forInput)
3413 {
3414         Assert(!InProgressBuf);
3415
3416         for (;;)
3417         {
3418                 /*
3419                  * Grab the io_in_progress lock so that other processes can wait for
3420                  * me to finish the I/O.
3421                  */
3422                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3423
3424                 LockBufHdr(buf);
3425
3426                 if (!(buf->flags & BM_IO_IN_PROGRESS))
3427                         break;
3428
3429                 /*
3430                  * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3431                  * lock isn't held is if the process doing the I/O is recovering from
3432                  * an error (see AbortBufferIO).  If that's the case, we must wait for
3433                  * him to get unwedged.
3434                  */
3435                 UnlockBufHdr(buf);
3436                 LWLockRelease(buf->io_in_progress_lock);
3437                 WaitIO(buf);
3438         }
3439
3440         /* Once we get here, there is definitely no I/O active on this buffer */
3441
3442         if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
3443         {
3444                 /* someone else already did the I/O */
3445                 UnlockBufHdr(buf);
3446                 LWLockRelease(buf->io_in_progress_lock);
3447                 return false;
3448         }
3449
3450         buf->flags |= BM_IO_IN_PROGRESS;
3451
3452         UnlockBufHdr(buf);
3453
3454         InProgressBuf = buf;
3455         IsForInput = forInput;
3456
3457         return true;
3458 }
3459
3460 /*
3461  * TerminateBufferIO: release a buffer we were doing I/O on
3462  *      (Assumptions)
3463  *      My process is executing IO for the buffer
3464  *      BM_IO_IN_PROGRESS bit is set for the buffer
3465  *      We hold the buffer's io_in_progress lock
3466  *      The buffer is Pinned
3467  *
3468  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3469  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
3470  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
3471  * marking the buffer clean if it was re-dirtied while we were writing.
3472  *
3473  * set_flag_bits gets ORed into the buffer's flags.  It must include
3474  * BM_IO_ERROR in a failure case.  For successful completion it could
3475  * be 0, or BM_VALID if we just finished reading in the page.
3476  */
3477 static void
3478 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
3479                                   int set_flag_bits)
3480 {
3481         Assert(buf == InProgressBuf);
3482
3483         LockBufHdr(buf);
3484
3485         Assert(buf->flags & BM_IO_IN_PROGRESS);
3486         buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3487         if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
3488                 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3489         buf->flags |= set_flag_bits;
3490
3491         UnlockBufHdr(buf);
3492
3493         InProgressBuf = NULL;
3494
3495         LWLockRelease(buf->io_in_progress_lock);
3496 }
3497
3498 /*
3499  * AbortBufferIO: Clean up any active buffer I/O after an error.
3500  *
3501  *      All LWLocks we might have held have been released,
3502  *      but we haven't yet released buffer pins, so the buffer is still pinned.
3503  *
3504  *      If I/O was in progress, we always set BM_IO_ERROR, even though it's
3505  *      possible the error condition wasn't related to the I/O.
3506  */
3507 void
3508 AbortBufferIO(void)
3509 {
3510         volatile BufferDesc *buf = InProgressBuf;
3511
3512         if (buf)
3513         {
3514                 /*
3515                  * Since LWLockReleaseAll has already been called, we're not holding
3516                  * the buffer's io_in_progress_lock. We have to re-acquire it so that
3517                  * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3518                  * buffer will be in a busy spin until we succeed in doing this.
3519                  */
3520                 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
3521
3522                 LockBufHdr(buf);
3523                 Assert(buf->flags & BM_IO_IN_PROGRESS);
3524                 if (IsForInput)
3525                 {
3526                         Assert(!(buf->flags & BM_DIRTY));
3527                         /* We'd better not think buffer is valid yet */
3528                         Assert(!(buf->flags & BM_VALID));
3529                         UnlockBufHdr(buf);
3530                 }
3531                 else
3532                 {
3533                         BufFlags        sv_flags;
3534
3535                         sv_flags = buf->flags;
3536                         Assert(sv_flags & BM_DIRTY);
3537                         UnlockBufHdr(buf);
3538                         /* Issue notice if this is not the first failure... */
3539                         if (sv_flags & BM_IO_ERROR)
3540                         {
3541                                 /* Buffer is pinned, so we can read tag without spinlock */
3542                                 char       *path;
3543
3544                                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
3545                                 ereport(WARNING,
3546                                                 (errcode(ERRCODE_IO_ERROR),
3547                                                  errmsg("could not write block %u of %s",
3548                                                                 buf->tag.blockNum, path),
3549                                                  errdetail("Multiple failures --- write error might be permanent.")));
3550                                 pfree(path);
3551                         }
3552                 }
3553                 TerminateBufferIO(buf, false, BM_IO_ERROR);
3554         }
3555 }
3556
3557 /*
3558  * Error context callback for errors occurring during shared buffer writes.
3559  */
3560 static void
3561 shared_buffer_write_error_callback(void *arg)
3562 {
3563         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3564
3565         /* Buffer is pinned, so we can read the tag without locking the spinlock */
3566         if (bufHdr != NULL)
3567         {
3568                 char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
3569
3570                 errcontext("writing block %u of relation %s",
3571                                    bufHdr->tag.blockNum, path);
3572                 pfree(path);
3573         }
3574 }
3575
3576 /*
3577  * Error context callback for errors occurring during local buffer writes.
3578  */
3579 static void
3580 local_buffer_write_error_callback(void *arg)
3581 {
3582         volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
3583
3584         if (bufHdr != NULL)
3585         {
3586                 char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
3587                                                                                   bufHdr->tag.forkNum);
3588
3589                 errcontext("writing block %u of relation %s",
3590                                    bufHdr->tag.blockNum, path);
3591                 pfree(path);
3592         }
3593 }
3594
3595 /*
3596  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
3597  */
3598 static int
3599 rnode_comparator(const void *p1, const void *p2)
3600 {
3601         RelFileNode n1 = *(RelFileNode *) p1;
3602         RelFileNode n2 = *(RelFileNode *) p2;
3603
3604         if (n1.relNode < n2.relNode)
3605                 return -1;
3606         else if (n1.relNode > n2.relNode)
3607                 return 1;
3608
3609         if (n1.dbNode < n2.dbNode)
3610                 return -1;
3611         else if (n1.dbNode > n2.dbNode)
3612                 return 1;
3613
3614         if (n1.spcNode < n2.spcNode)
3615                 return -1;
3616         else if (n1.spcNode > n2.spcNode)
3617                 return 1;
3618         else
3619                 return 0;
3620 }