]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
Update copyright for 2016
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *              and pin it so that no one can destroy it while this process
20  *              is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *              The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *              freelist.c -- chooses victim for buffer replacement
29  *              buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "access/xlog.h"
37 #include "catalog/catalog.h"
38 #include "catalog/storage.h"
39 #include "executor/instrument.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/buf_internals.h"
45 #include "storage/bufmgr.h"
46 #include "storage/ipc.h"
47 #include "storage/proc.h"
48 #include "storage/smgr.h"
49 #include "storage/standby.h"
50 #include "utils/rel.h"
51 #include "utils/resowner_private.h"
52 #include "utils/timestamp.h"
53
54
55 /* Note: these two macros only work on shared buffers, not local ones! */
56 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
57 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
58
59 /* Note: this macro only works on local buffers, not shared ones! */
60 #define LocalBufHdrGetBlock(bufHdr) \
61         LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
62
63 /* Bits in SyncOneBuffer's return value */
64 #define BUF_WRITTEN                             0x01
65 #define BUF_REUSABLE                    0x02
66
67 #define DROP_RELS_BSEARCH_THRESHOLD             20
68
69 typedef struct PrivateRefCountEntry
70 {
71         Buffer          buffer;
72         int32           refcount;
73 } PrivateRefCountEntry;
74
75 /* 64 bytes, about the size of a cache line on common systems */
76 #define REFCOUNT_ARRAY_ENTRIES 8
77
78 /* GUC variables */
79 bool            zero_damaged_pages = false;
80 int                     bgwriter_lru_maxpages = 100;
81 double          bgwriter_lru_multiplier = 2.0;
82 bool            track_io_timing = false;
83 int                     effective_io_concurrency = 0;
84
85 /*
86  * How many buffers PrefetchBuffer callers should try to stay ahead of their
87  * ReadBuffer calls by.  This is maintained by the assign hook for
88  * effective_io_concurrency.  Zero means "never prefetch".  This value is
89  * only used for buffers not belonging to tablespaces that have their
90  * effective_io_concurrency parameter set.
91  */
92 int                     target_prefetch_pages = 0;
93
94 /* local state for StartBufferIO and related functions */
95 static BufferDesc *InProgressBuf = NULL;
96 static bool IsForInput;
97
98 /* local state for LockBufferForCleanup */
99 static BufferDesc *PinCountWaitBuf = NULL;
100
101 /*
102  * Backend-Private refcount management:
103  *
104  * Each buffer also has a private refcount that keeps track of the number of
105  * times the buffer is pinned in the current process.  This is so that the
106  * shared refcount needs to be modified only once if a buffer is pinned more
107  * than once by an individual backend.  It's also used to check that no buffers
108  * are still pinned at the end of transactions and when exiting.
109  *
110  *
111  * To avoid - as we used to - requiring an array with NBuffers entries to keep
112  * track of local buffers, we use a small sequentially searched array
113  * (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to
114  * keep track of backend local pins.
115  *
116  * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
117  * refcounts are kept track of in the array; after that, new array entries
118  * displace old ones into the hash table. That way a frequently used entry
119  * can't get "stuck" in the hashtable while infrequent ones clog the array.
120  *
121  * Note that in most scenarios the number of pinned buffers will not exceed
122  * REFCOUNT_ARRAY_ENTRIES.
123  *
124  *
125  * To enter a buffer into the refcount tracking mechanism first reserve a free
126  * entry using ReservePrivateRefCountEntry() and then later, if necessary,
127  * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
128  * memory allocations in NewPrivateRefCountEntry() which can be important
129  * because in some scenarios it's called with a spinlock held...
130  */
131 static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
132 static HTAB *PrivateRefCountHash = NULL;
133 static int32 PrivateRefCountOverflowed = 0;
134 static uint32 PrivateRefCountClock = 0;
135 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
136
137 static void ReservePrivateRefCountEntry(void);
138 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
139 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
140 static inline int32 GetPrivateRefCount(Buffer buffer);
141 static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
142
143 /*
144  * Ensure that the PrivateRefCountArray has sufficient space to store one more
145  * entry. This has to be called before using NewPrivateRefCountEntry() to fill
146  * a new entry - but it's perfectly fine to not use a reserved entry.
147  */
148 static void
149 ReservePrivateRefCountEntry(void)
150 {
151         /* Already reserved (or freed), nothing to do */
152         if (ReservedRefCountEntry != NULL)
153                 return;
154
155         /*
156          * First search for a free entry the array, that'll be sufficient in the
157          * majority of cases.
158          */
159         {
160                 int                     i;
161
162                 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
163                 {
164                         PrivateRefCountEntry *res;
165
166                         res = &PrivateRefCountArray[i];
167
168                         if (res->buffer == InvalidBuffer)
169                         {
170                                 ReservedRefCountEntry = res;
171                                 return;
172                         }
173                 }
174         }
175
176         /*
177          * No luck. All array entries are full. Move one array entry into the hash
178          * table.
179          */
180         {
181                 /*
182                  * Move entry from the current clock position in the array into the
183                  * hashtable. Use that slot.
184                  */
185                 PrivateRefCountEntry *hashent;
186                 bool            found;
187
188                 /* select victim slot */
189                 ReservedRefCountEntry =
190                         &PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
191
192                 /* Better be used, otherwise we shouldn't get here. */
193                 Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
194
195                 /* enter victim array entry into hashtable */
196                 hashent = hash_search(PrivateRefCountHash,
197                                                           (void *) &(ReservedRefCountEntry->buffer),
198                                                           HASH_ENTER,
199                                                           &found);
200                 Assert(!found);
201                 hashent->refcount = ReservedRefCountEntry->refcount;
202
203                 /* clear the now free array slot */
204                 ReservedRefCountEntry->buffer = InvalidBuffer;
205                 ReservedRefCountEntry->refcount = 0;
206
207                 PrivateRefCountOverflowed++;
208         }
209 }
210
211 /*
212  * Fill a previously reserved refcount entry.
213  */
214 static PrivateRefCountEntry *
215 NewPrivateRefCountEntry(Buffer buffer)
216 {
217         PrivateRefCountEntry *res;
218
219         /* only allowed to be called when a reservation has been made */
220         Assert(ReservedRefCountEntry != NULL);
221
222         /* use up the reserved entry */
223         res = ReservedRefCountEntry;
224         ReservedRefCountEntry = NULL;
225
226         /* and fill it */
227         res->buffer = buffer;
228         res->refcount = 0;
229
230         return res;
231 }
232
233 /*
234  * Return the PrivateRefCount entry for the passed buffer.
235  *
236  * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
237  * do_move is true, and the entry resides in the hashtable the entry is
238  * optimized for frequent access by moving it to the array.
239  */
240 static PrivateRefCountEntry *
241 GetPrivateRefCountEntry(Buffer buffer, bool do_move)
242 {
243         PrivateRefCountEntry *res;
244         int                     i;
245
246         Assert(BufferIsValid(buffer));
247         Assert(!BufferIsLocal(buffer));
248
249         /*
250          * First search for references in the array, that'll be sufficient in the
251          * majority of cases.
252          */
253         for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
254         {
255                 res = &PrivateRefCountArray[i];
256
257                 if (res->buffer == buffer)
258                         return res;
259         }
260
261         /*
262          * By here we know that the buffer, if already pinned, isn't residing in
263          * the array.
264          *
265          * Only look up the buffer in the hashtable if we've previously overflowed
266          * into it.
267          */
268         if (PrivateRefCountOverflowed == 0)
269                 return NULL;
270
271         res = hash_search(PrivateRefCountHash,
272                                           (void *) &buffer,
273                                           HASH_FIND,
274                                           NULL);
275
276         if (res == NULL)
277                 return NULL;
278         else if (!do_move)
279         {
280                 /* caller doesn't want us to move the hash entry into the array */
281                 return res;
282         }
283         else
284         {
285                 /* move buffer from hashtable into the free array slot */
286                 bool            found;
287                 PrivateRefCountEntry *free;
288
289                 /* Ensure there's a free array slot */
290                 ReservePrivateRefCountEntry();
291
292                 /* Use up the reserved slot */
293                 Assert(ReservedRefCountEntry != NULL);
294                 free = ReservedRefCountEntry;
295                 ReservedRefCountEntry = NULL;
296                 Assert(free->buffer == InvalidBuffer);
297
298                 /* and fill it */
299                 free->buffer = buffer;
300                 free->refcount = res->refcount;
301
302                 /* delete from hashtable */
303                 hash_search(PrivateRefCountHash,
304                                         (void *) &buffer,
305                                         HASH_REMOVE,
306                                         &found);
307                 Assert(found);
308                 Assert(PrivateRefCountOverflowed > 0);
309                 PrivateRefCountOverflowed--;
310
311                 return free;
312         }
313 }
314
315 /*
316  * Returns how many times the passed buffer is pinned by this backend.
317  *
318  * Only works for shared memory buffers!
319  */
320 static inline int32
321 GetPrivateRefCount(Buffer buffer)
322 {
323         PrivateRefCountEntry *ref;
324
325         Assert(BufferIsValid(buffer));
326         Assert(!BufferIsLocal(buffer));
327
328         /*
329          * Not moving the entry - that's ok for the current users, but we might
330          * want to change this one day.
331          */
332         ref = GetPrivateRefCountEntry(buffer, false);
333
334         if (ref == NULL)
335                 return 0;
336         return ref->refcount;
337 }
338
339 /*
340  * Release resources used to track the reference count of a buffer which we no
341  * longer have pinned and don't want to pin again immediately.
342  */
343 static void
344 ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
345 {
346         Assert(ref->refcount == 0);
347
348         if (ref >= &PrivateRefCountArray[0] &&
349                 ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
350         {
351                 ref->buffer = InvalidBuffer;
352
353                 /*
354                  * Mark the just used entry as reserved - in many scenarios that
355                  * allows us to avoid ever having to search the array/hash for free
356                  * entries.
357                  */
358                 ReservedRefCountEntry = ref;
359         }
360         else
361         {
362                 bool            found;
363                 Buffer          buffer = ref->buffer;
364
365                 hash_search(PrivateRefCountHash,
366                                         (void *) &buffer,
367                                         HASH_REMOVE,
368                                         &found);
369                 Assert(found);
370                 Assert(PrivateRefCountOverflowed > 0);
371                 PrivateRefCountOverflowed--;
372         }
373 }
374
375 /*
376  * BufferIsPinned
377  *              True iff the buffer is pinned (also checks for valid buffer number).
378  *
379  *              NOTE: what we check here is that *this* backend holds a pin on
380  *              the buffer.  We do not care whether some other backend does.
381  */
382 #define BufferIsPinned(bufnum) \
383 ( \
384         !BufferIsValid(bufnum) ? \
385                 false \
386         : \
387                 BufferIsLocal(bufnum) ? \
388                         (LocalRefCount[-(bufnum) - 1] > 0) \
389                 : \
390         (GetPrivateRefCount(bufnum) > 0) \
391 )
392
393
394 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
395                                   ForkNumber forkNum, BlockNumber blockNum,
396                                   ReadBufferMode mode, BufferAccessStrategy strategy,
397                                   bool *hit);
398 static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
399 static void PinBuffer_Locked(BufferDesc *buf);
400 static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
401 static void BufferSync(int flags);
402 static int      SyncOneBuffer(int buf_id, bool skip_recently_used);
403 static void WaitIO(BufferDesc *buf);
404 static bool StartBufferIO(BufferDesc *buf, bool forInput);
405 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
406                                   int set_flag_bits);
407 static void shared_buffer_write_error_callback(void *arg);
408 static void local_buffer_write_error_callback(void *arg);
409 static BufferDesc *BufferAlloc(SMgrRelation smgr,
410                         char relpersistence,
411                         ForkNumber forkNum,
412                         BlockNumber blockNum,
413                         BufferAccessStrategy strategy,
414                         bool *foundPtr);
415 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
416 static void AtProcExit_Buffers(int code, Datum arg);
417 static void CheckForBufferLeaks(void);
418 static int      rnode_comparator(const void *p1, const void *p2);
419
420
421 /*
422  * ComputeIoConcurrency -- get the number of pages to prefetch for a given
423  *              number of spindles.
424  */
425 bool
426 ComputeIoConcurrency(int io_concurrency, double *target)
427 {
428         double          new_prefetch_pages = 0.0;
429         int                     i;
430
431         /*
432          * Make sure the io_concurrency value is within valid range; it may have
433          * been forced with a manual pg_tablespace update.
434          */
435         io_concurrency = Min(Max(io_concurrency, 0), MAX_IO_CONCURRENCY);
436
437         /*----------
438          * The user-visible GUC parameter is the number of drives (spindles),
439          * which we need to translate to a number-of-pages-to-prefetch target.
440          * The target value is stashed in *extra and then assigned to the actual
441          * variable by assign_effective_io_concurrency.
442          *
443          * The expected number of prefetch pages needed to keep N drives busy is:
444          *
445          * drives |   I/O requests
446          * -------+----------------
447          *              1 |   1
448          *              2 |   2/1 + 2/2 = 3
449          *              3 |   3/1 + 3/2 + 3/3 = 5 1/2
450          *              4 |   4/1 + 4/2 + 4/3 + 4/4 = 8 1/3
451          *              n |   n * H(n)
452          *
453          * This is called the "coupon collector problem" and H(n) is called the
454          * harmonic series.  This could be approximated by n * ln(n), but for
455          * reasonable numbers of drives we might as well just compute the series.
456          *
457          * Alternatively we could set the target to the number of pages necessary
458          * so that the expected number of active spindles is some arbitrary
459          * percentage of the total.  This sounds the same but is actually slightly
460          * different.  The result ends up being ln(1-P)/ln((n-1)/n) where P is
461          * that desired fraction.
462          *
463          * Experimental results show that both of these formulas aren't aggressive
464          * enough, but we don't really have any better proposals.
465          *
466          * Note that if io_concurrency = 0 (disabled), we must set target = 0.
467          *----------
468          */
469
470         for (i = 1; i <= io_concurrency; i++)
471                 new_prefetch_pages += (double) io_concurrency / (double) i;
472
473         *target = new_prefetch_pages;
474
475         /* This range check shouldn't fail, but let's be paranoid */
476         return (new_prefetch_pages > 0.0 && new_prefetch_pages < (double) INT_MAX);
477 }
478
479 /*
480  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
481  *
482  * This is named by analogy to ReadBuffer but doesn't actually allocate a
483  * buffer.  Instead it tries to ensure that a future ReadBuffer for the given
484  * block will not be delayed by the I/O.  Prefetching is optional.
485  * No-op if prefetching isn't compiled in.
486  */
487 void
488 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
489 {
490 #ifdef USE_PREFETCH
491         Assert(RelationIsValid(reln));
492         Assert(BlockNumberIsValid(blockNum));
493
494         /* Open it at the smgr level if not already done */
495         RelationOpenSmgr(reln);
496
497         if (RelationUsesLocalBuffers(reln))
498         {
499                 /* see comments in ReadBufferExtended */
500                 if (RELATION_IS_OTHER_TEMP(reln))
501                         ereport(ERROR,
502                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
503                                 errmsg("cannot access temporary tables of other sessions")));
504
505                 /* pass it off to localbuf.c */
506                 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
507         }
508         else
509         {
510                 BufferTag       newTag;         /* identity of requested block */
511                 uint32          newHash;        /* hash value for newTag */
512                 LWLock     *newPartitionLock;   /* buffer partition lock for it */
513                 int                     buf_id;
514
515                 /* create a tag so we can lookup the buffer */
516                 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
517                                            forkNum, blockNum);
518
519                 /* determine its hash code and partition lock ID */
520                 newHash = BufTableHashCode(&newTag);
521                 newPartitionLock = BufMappingPartitionLock(newHash);
522
523                 /* see if the block is in the buffer pool already */
524                 LWLockAcquire(newPartitionLock, LW_SHARED);
525                 buf_id = BufTableLookup(&newTag, newHash);
526                 LWLockRelease(newPartitionLock);
527
528                 /* If not in buffers, initiate prefetch */
529                 if (buf_id < 0)
530                         smgrprefetch(reln->rd_smgr, forkNum, blockNum);
531
532                 /*
533                  * If the block *is* in buffers, we do nothing.  This is not really
534                  * ideal: the block might be just about to be evicted, which would be
535                  * stupid since we know we are going to need it soon.  But the only
536                  * easy answer is to bump the usage_count, which does not seem like a
537                  * great solution: when the caller does ultimately touch the block,
538                  * usage_count would get bumped again, resulting in too much
539                  * favoritism for blocks that are involved in a prefetch sequence. A
540                  * real fix would involve some additional per-buffer state, and it's
541                  * not clear that there's enough of a problem to justify that.
542                  */
543         }
544 #endif   /* USE_PREFETCH */
545 }
546
547
548 /*
549  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
550  *              fork with RBM_NORMAL mode and default strategy.
551  */
552 Buffer
553 ReadBuffer(Relation reln, BlockNumber blockNum)
554 {
555         return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
556 }
557
558 /*
559  * ReadBufferExtended -- returns a buffer containing the requested
560  *              block of the requested relation.  If the blknum
561  *              requested is P_NEW, extend the relation file and
562  *              allocate a new block.  (Caller is responsible for
563  *              ensuring that only one backend tries to extend a
564  *              relation at the same time!)
565  *
566  * Returns: the buffer number for the buffer containing
567  *              the block read.  The returned buffer has been pinned.
568  *              Does not return on error --- elog's instead.
569  *
570  * Assume when this function is called, that reln has been opened already.
571  *
572  * In RBM_NORMAL mode, the page is read from disk, and the page header is
573  * validated.  An error is thrown if the page header is not valid.  (But
574  * note that an all-zero page is considered "valid"; see PageIsVerified().)
575  *
576  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
577  * valid, the page is zeroed instead of throwing an error. This is intended
578  * for non-critical data, where the caller is prepared to repair errors.
579  *
580  * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
581  * filled with zeros instead of reading it from disk.  Useful when the caller
582  * is going to fill the page from scratch, since this saves I/O and avoids
583  * unnecessary failure if the page-on-disk has corrupt page headers.
584  * The page is returned locked to ensure that the caller has a chance to
585  * initialize the page before it's made visible to others.
586  * Caution: do not use this mode to read a page that is beyond the relation's
587  * current physical EOF; that is likely to cause problems in md.c when
588  * the page is modified and written out. P_NEW is OK, though.
589  *
590  * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
591  * a cleanup-strength lock on the page.
592  *
593  * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
594  *
595  * If strategy is not NULL, a nondefault buffer access strategy is used.
596  * See buffer/README for details.
597  */
598 Buffer
599 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
600                                    ReadBufferMode mode, BufferAccessStrategy strategy)
601 {
602         bool            hit;
603         Buffer          buf;
604
605         /* Open it at the smgr level if not already done */
606         RelationOpenSmgr(reln);
607
608         /*
609          * Reject attempts to read non-local temporary relations; we would be
610          * likely to get wrong data since we have no visibility into the owning
611          * session's local buffers.
612          */
613         if (RELATION_IS_OTHER_TEMP(reln))
614                 ereport(ERROR,
615                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
616                                  errmsg("cannot access temporary tables of other sessions")));
617
618         /*
619          * Read the buffer, and update pgstat counters to reflect a cache hit or
620          * miss.
621          */
622         pgstat_count_buffer_read(reln);
623         buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
624                                                         forkNum, blockNum, mode, strategy, &hit);
625         if (hit)
626                 pgstat_count_buffer_hit(reln);
627         return buf;
628 }
629
630
631 /*
632  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
633  *              a relcache entry for the relation.
634  *
635  * NB: At present, this function may only be used on permanent relations, which
636  * is OK, because we only use it during XLOG replay.  If in the future we
637  * want to use it on temporary or unlogged relations, we could pass additional
638  * parameters.
639  */
640 Buffer
641 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
642                                                   BlockNumber blockNum, ReadBufferMode mode,
643                                                   BufferAccessStrategy strategy)
644 {
645         bool            hit;
646
647         SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
648
649         Assert(InRecovery);
650
651         return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
652                                                          mode, strategy, &hit);
653 }
654
655
656 /*
657  * ReadBuffer_common -- common logic for all ReadBuffer variants
658  *
659  * *hit is set to true if the request was satisfied from shared buffer cache.
660  */
661 static Buffer
662 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
663                                   BlockNumber blockNum, ReadBufferMode mode,
664                                   BufferAccessStrategy strategy, bool *hit)
665 {
666         BufferDesc *bufHdr;
667         Block           bufBlock;
668         bool            found;
669         bool            isExtend;
670         bool            isLocalBuf = SmgrIsTemp(smgr);
671
672         *hit = false;
673
674         /* Make sure we will have room to remember the buffer pin */
675         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
676
677         isExtend = (blockNum == P_NEW);
678
679         TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
680                                                                            smgr->smgr_rnode.node.spcNode,
681                                                                            smgr->smgr_rnode.node.dbNode,
682                                                                            smgr->smgr_rnode.node.relNode,
683                                                                            smgr->smgr_rnode.backend,
684                                                                            isExtend);
685
686         /* Substitute proper block number if caller asked for P_NEW */
687         if (isExtend)
688                 blockNum = smgrnblocks(smgr, forkNum);
689
690         if (isLocalBuf)
691         {
692                 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
693                 if (found)
694                         pgBufferUsage.local_blks_hit++;
695                 else
696                         pgBufferUsage.local_blks_read++;
697         }
698         else
699         {
700                 /*
701                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
702                  * not currently in memory.
703                  */
704                 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
705                                                          strategy, &found);
706                 if (found)
707                         pgBufferUsage.shared_blks_hit++;
708                 else
709                         pgBufferUsage.shared_blks_read++;
710         }
711
712         /* At this point we do NOT hold any locks. */
713
714         /* if it was already in the buffer pool, we're done */
715         if (found)
716         {
717                 if (!isExtend)
718                 {
719                         /* Just need to update stats before we exit */
720                         *hit = true;
721                         VacuumPageHit++;
722
723                         if (VacuumCostActive)
724                                 VacuumCostBalance += VacuumCostPageHit;
725
726                         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
727                                                                                           smgr->smgr_rnode.node.spcNode,
728                                                                                           smgr->smgr_rnode.node.dbNode,
729                                                                                           smgr->smgr_rnode.node.relNode,
730                                                                                           smgr->smgr_rnode.backend,
731                                                                                           isExtend,
732                                                                                           found);
733
734                         /*
735                          * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
736                          * locked on return.
737                          */
738                         if (!isLocalBuf)
739                         {
740                                 if (mode == RBM_ZERO_AND_LOCK)
741                                         LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
742                                                                   LW_EXCLUSIVE);
743                                 else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
744                                         LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
745                         }
746
747                         return BufferDescriptorGetBuffer(bufHdr);
748                 }
749
750                 /*
751                  * We get here only in the corner case where we are trying to extend
752                  * the relation but we found a pre-existing buffer marked BM_VALID.
753                  * This can happen because mdread doesn't complain about reads beyond
754                  * EOF (when zero_damaged_pages is ON) and so a previous attempt to
755                  * read a block beyond EOF could have left a "valid" zero-filled
756                  * buffer.  Unfortunately, we have also seen this case occurring
757                  * because of buggy Linux kernels that sometimes return an
758                  * lseek(SEEK_END) result that doesn't account for a recent write. In
759                  * that situation, the pre-existing buffer would contain valid data
760                  * that we don't want to overwrite.  Since the legitimate case should
761                  * always have left a zero-filled buffer, complain if not PageIsNew.
762                  */
763                 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
764                 if (!PageIsNew((Page) bufBlock))
765                         ereport(ERROR,
766                          (errmsg("unexpected data beyond EOF in block %u of relation %s",
767                                          blockNum, relpath(smgr->smgr_rnode, forkNum)),
768                           errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
769
770                 /*
771                  * We *must* do smgrextend before succeeding, else the page will not
772                  * be reserved by the kernel, and the next P_NEW call will decide to
773                  * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
774                  * call that BufferAlloc didn't, and proceed.
775                  */
776                 if (isLocalBuf)
777                 {
778                         /* Only need to adjust flags */
779                         Assert(bufHdr->flags & BM_VALID);
780                         bufHdr->flags &= ~BM_VALID;
781                 }
782                 else
783                 {
784                         /*
785                          * Loop to handle the very small possibility that someone re-sets
786                          * BM_VALID between our clearing it and StartBufferIO inspecting
787                          * it.
788                          */
789                         do
790                         {
791                                 LockBufHdr(bufHdr);
792                                 Assert(bufHdr->flags & BM_VALID);
793                                 bufHdr->flags &= ~BM_VALID;
794                                 UnlockBufHdr(bufHdr);
795                         } while (!StartBufferIO(bufHdr, true));
796                 }
797         }
798
799         /*
800          * if we have gotten to this point, we have allocated a buffer for the
801          * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
802          * if it's a shared buffer.
803          *
804          * Note: if smgrextend fails, we will end up with a buffer that is
805          * allocated but not marked BM_VALID.  P_NEW will still select the same
806          * block number (because the relation didn't get any longer on disk) and
807          * so future attempts to extend the relation will find the same buffer (if
808          * it's not been recycled) but come right back here to try smgrextend
809          * again.
810          */
811         Assert(!(bufHdr->flags & BM_VALID));            /* spinlock not needed */
812
813         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
814
815         if (isExtend)
816         {
817                 /* new buffers are zero-filled */
818                 MemSet((char *) bufBlock, 0, BLCKSZ);
819                 /* don't set checksum for all-zero page */
820                 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
821         }
822         else
823         {
824                 /*
825                  * Read in the page, unless the caller intends to overwrite it and
826                  * just wants us to allocate a buffer.
827                  */
828                 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
829                         MemSet((char *) bufBlock, 0, BLCKSZ);
830                 else
831                 {
832                         instr_time      io_start,
833                                                 io_time;
834
835                         if (track_io_timing)
836                                 INSTR_TIME_SET_CURRENT(io_start);
837
838                         smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
839
840                         if (track_io_timing)
841                         {
842                                 INSTR_TIME_SET_CURRENT(io_time);
843                                 INSTR_TIME_SUBTRACT(io_time, io_start);
844                                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
845                                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
846                         }
847
848                         /* check for garbage data */
849                         if (!PageIsVerified((Page) bufBlock, blockNum))
850                         {
851                                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
852                                 {
853                                         ereport(WARNING,
854                                                         (errcode(ERRCODE_DATA_CORRUPTED),
855                                                          errmsg("invalid page in block %u of relation %s; zeroing out page",
856                                                                         blockNum,
857                                                                         relpath(smgr->smgr_rnode, forkNum))));
858                                         MemSet((char *) bufBlock, 0, BLCKSZ);
859                                 }
860                                 else
861                                         ereport(ERROR,
862                                                         (errcode(ERRCODE_DATA_CORRUPTED),
863                                                          errmsg("invalid page in block %u of relation %s",
864                                                                         blockNum,
865                                                                         relpath(smgr->smgr_rnode, forkNum))));
866                         }
867                 }
868         }
869
870         /*
871          * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
872          * the page as valid, to make sure that no other backend sees the zeroed
873          * page before the caller has had a chance to initialize it.
874          *
875          * Since no-one else can be looking at the page contents yet, there is no
876          * difference between an exclusive lock and a cleanup-strength lock. (Note
877          * that we cannot use LockBuffer() of LockBufferForCleanup() here, because
878          * they assert that the buffer is already valid.)
879          */
880         if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
881                 !isLocalBuf)
882         {
883                 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
884         }
885
886         if (isLocalBuf)
887         {
888                 /* Only need to adjust flags */
889                 bufHdr->flags |= BM_VALID;
890         }
891         else
892         {
893                 /* Set BM_VALID, terminate IO, and wake up any waiters */
894                 TerminateBufferIO(bufHdr, false, BM_VALID);
895         }
896
897         VacuumPageMiss++;
898         if (VacuumCostActive)
899                 VacuumCostBalance += VacuumCostPageMiss;
900
901         TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
902                                                                           smgr->smgr_rnode.node.spcNode,
903                                                                           smgr->smgr_rnode.node.dbNode,
904                                                                           smgr->smgr_rnode.node.relNode,
905                                                                           smgr->smgr_rnode.backend,
906                                                                           isExtend,
907                                                                           found);
908
909         return BufferDescriptorGetBuffer(bufHdr);
910 }
911
912 /*
913  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
914  *              buffer.  If no buffer exists already, selects a replacement
915  *              victim and evicts the old page, but does NOT read in new page.
916  *
917  * "strategy" can be a buffer replacement strategy object, or NULL for
918  * the default strategy.  The selected buffer's usage_count is advanced when
919  * using the default strategy, but otherwise possibly not (see PinBuffer).
920  *
921  * The returned buffer is pinned and is already marked as holding the
922  * desired page.  If it already did have the desired page, *foundPtr is
923  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
924  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
925  *
926  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
927  * we keep it for simplicity in ReadBuffer.
928  *
929  * No locks are held either at entry or exit.
930  */
931 static BufferDesc *
932 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
933                         BlockNumber blockNum,
934                         BufferAccessStrategy strategy,
935                         bool *foundPtr)
936 {
937         BufferTag       newTag;                 /* identity of requested block */
938         uint32          newHash;                /* hash value for newTag */
939         LWLock     *newPartitionLock;           /* buffer partition lock for it */
940         BufferTag       oldTag;                 /* previous identity of selected buffer */
941         uint32          oldHash;                /* hash value for oldTag */
942         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
943         BufFlags        oldFlags;
944         int                     buf_id;
945         BufferDesc *buf;
946         bool            valid;
947
948         /* create a tag so we can lookup the buffer */
949         INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
950
951         /* determine its hash code and partition lock ID */
952         newHash = BufTableHashCode(&newTag);
953         newPartitionLock = BufMappingPartitionLock(newHash);
954
955         /* see if the block is in the buffer pool already */
956         LWLockAcquire(newPartitionLock, LW_SHARED);
957         buf_id = BufTableLookup(&newTag, newHash);
958         if (buf_id >= 0)
959         {
960                 /*
961                  * Found it.  Now, pin the buffer so no one can steal it from the
962                  * buffer pool, and check to see if the correct data has been loaded
963                  * into the buffer.
964                  */
965                 buf = GetBufferDescriptor(buf_id);
966
967                 valid = PinBuffer(buf, strategy);
968
969                 /* Can release the mapping lock as soon as we've pinned it */
970                 LWLockRelease(newPartitionLock);
971
972                 *foundPtr = TRUE;
973
974                 if (!valid)
975                 {
976                         /*
977                          * We can only get here if (a) someone else is still reading in
978                          * the page, or (b) a previous read attempt failed.  We have to
979                          * wait for any active read attempt to finish, and then set up our
980                          * own read attempt if the page is still not BM_VALID.
981                          * StartBufferIO does it all.
982                          */
983                         if (StartBufferIO(buf, true))
984                         {
985                                 /*
986                                  * If we get here, previous attempts to read the buffer must
987                                  * have failed ... but we shall bravely try again.
988                                  */
989                                 *foundPtr = FALSE;
990                         }
991                 }
992
993                 return buf;
994         }
995
996         /*
997          * Didn't find it in the buffer pool.  We'll have to initialize a new
998          * buffer.  Remember to unlock the mapping lock while doing the work.
999          */
1000         LWLockRelease(newPartitionLock);
1001
1002         /* Loop here in case we have to try another victim buffer */
1003         for (;;)
1004         {
1005                 /*
1006                  * Ensure, while the spinlock's not yet held, that there's a free
1007                  * refcount entry.
1008                  */
1009                 ReservePrivateRefCountEntry();
1010
1011                 /*
1012                  * Select a victim buffer.  The buffer is returned with its header
1013                  * spinlock still held!
1014                  */
1015                 buf = StrategyGetBuffer(strategy);
1016
1017                 Assert(buf->refcount == 0);
1018
1019                 /* Must copy buffer flags while we still hold the spinlock */
1020                 oldFlags = buf->flags;
1021
1022                 /* Pin the buffer and then release the buffer spinlock */
1023                 PinBuffer_Locked(buf);
1024
1025                 /*
1026                  * If the buffer was dirty, try to write it out.  There is a race
1027                  * condition here, in that someone might dirty it after we released it
1028                  * above, or even while we are writing it out (since our share-lock
1029                  * won't prevent hint-bit updates).  We will recheck the dirty bit
1030                  * after re-locking the buffer header.
1031                  */
1032                 if (oldFlags & BM_DIRTY)
1033                 {
1034                         /*
1035                          * We need a share-lock on the buffer contents to write it out
1036                          * (else we might write invalid data, eg because someone else is
1037                          * compacting the page contents while we write).  We must use a
1038                          * conditional lock acquisition here to avoid deadlock.  Even
1039                          * though the buffer was not pinned (and therefore surely not
1040                          * locked) when StrategyGetBuffer returned it, someone else could
1041                          * have pinned and exclusive-locked it by the time we get here. If
1042                          * we try to get the lock unconditionally, we'd block waiting for
1043                          * them; if they later block waiting for us, deadlock ensues.
1044                          * (This has been observed to happen when two backends are both
1045                          * trying to split btree index pages, and the second one just
1046                          * happens to be trying to split the page the first one got from
1047                          * StrategyGetBuffer.)
1048                          */
1049                         if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
1050                                                                                  LW_SHARED))
1051                         {
1052                                 /*
1053                                  * If using a nondefault strategy, and writing the buffer
1054                                  * would require a WAL flush, let the strategy decide whether
1055                                  * to go ahead and write/reuse the buffer or to choose another
1056                                  * victim.  We need lock to inspect the page LSN, so this
1057                                  * can't be done inside StrategyGetBuffer.
1058                                  */
1059                                 if (strategy != NULL)
1060                                 {
1061                                         XLogRecPtr      lsn;
1062
1063                                         /* Read the LSN while holding buffer header lock */
1064                                         LockBufHdr(buf);
1065                                         lsn = BufferGetLSN(buf);
1066                                         UnlockBufHdr(buf);
1067
1068                                         if (XLogNeedsFlush(lsn) &&
1069                                                 StrategyRejectBuffer(strategy, buf))
1070                                         {
1071                                                 /* Drop lock/pin and loop around for another buffer */
1072                                                 LWLockRelease(BufferDescriptorGetContentLock(buf));
1073                                                 UnpinBuffer(buf, true);
1074                                                 continue;
1075                                         }
1076                                 }
1077
1078                                 /* OK, do the I/O */
1079                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
1080                                                                                            smgr->smgr_rnode.node.spcNode,
1081                                                                                                 smgr->smgr_rnode.node.dbNode,
1082                                                                                           smgr->smgr_rnode.node.relNode);
1083
1084                                 FlushBuffer(buf, NULL);
1085                                 LWLockRelease(BufferDescriptorGetContentLock(buf));
1086
1087                                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
1088                                                                                            smgr->smgr_rnode.node.spcNode,
1089                                                                                                 smgr->smgr_rnode.node.dbNode,
1090                                                                                           smgr->smgr_rnode.node.relNode);
1091                         }
1092                         else
1093                         {
1094                                 /*
1095                                  * Someone else has locked the buffer, so give it up and loop
1096                                  * back to get another one.
1097                                  */
1098                                 UnpinBuffer(buf, true);
1099                                 continue;
1100                         }
1101                 }
1102
1103                 /*
1104                  * To change the association of a valid buffer, we'll need to have
1105                  * exclusive lock on both the old and new mapping partitions.
1106                  */
1107                 if (oldFlags & BM_TAG_VALID)
1108                 {
1109                         /*
1110                          * Need to compute the old tag's hashcode and partition lock ID.
1111                          * XXX is it worth storing the hashcode in BufferDesc so we need
1112                          * not recompute it here?  Probably not.
1113                          */
1114                         oldTag = buf->tag;
1115                         oldHash = BufTableHashCode(&oldTag);
1116                         oldPartitionLock = BufMappingPartitionLock(oldHash);
1117
1118                         /*
1119                          * Must lock the lower-numbered partition first to avoid
1120                          * deadlocks.
1121                          */
1122                         if (oldPartitionLock < newPartitionLock)
1123                         {
1124                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1125                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1126                         }
1127                         else if (oldPartitionLock > newPartitionLock)
1128                         {
1129                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1130                                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1131                         }
1132                         else
1133                         {
1134                                 /* only one partition, only one lock */
1135                                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1136                         }
1137                 }
1138                 else
1139                 {
1140                         /* if it wasn't valid, we need only the new partition */
1141                         LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1142                         /* these just keep the compiler quiet about uninit variables */
1143                         oldHash = 0;
1144                         oldPartitionLock = 0;
1145                 }
1146
1147                 /*
1148                  * Try to make a hashtable entry for the buffer under its new tag.
1149                  * This could fail because while we were writing someone else
1150                  * allocated another buffer for the same block we want to read in.
1151                  * Note that we have not yet removed the hashtable entry for the old
1152                  * tag.
1153                  */
1154                 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
1155
1156                 if (buf_id >= 0)
1157                 {
1158                         /*
1159                          * Got a collision. Someone has already done what we were about to
1160                          * do. We'll just handle this as if it were found in the buffer
1161                          * pool in the first place.  First, give up the buffer we were
1162                          * planning to use.
1163                          */
1164                         UnpinBuffer(buf, true);
1165
1166                         /* Can give up that buffer's mapping partition lock now */
1167                         if ((oldFlags & BM_TAG_VALID) &&
1168                                 oldPartitionLock != newPartitionLock)
1169                                 LWLockRelease(oldPartitionLock);
1170
1171                         /* remaining code should match code at top of routine */
1172
1173                         buf = GetBufferDescriptor(buf_id);
1174
1175                         valid = PinBuffer(buf, strategy);
1176
1177                         /* Can release the mapping lock as soon as we've pinned it */
1178                         LWLockRelease(newPartitionLock);
1179
1180                         *foundPtr = TRUE;
1181
1182                         if (!valid)
1183                         {
1184                                 /*
1185                                  * We can only get here if (a) someone else is still reading
1186                                  * in the page, or (b) a previous read attempt failed.  We
1187                                  * have to wait for any active read attempt to finish, and
1188                                  * then set up our own read attempt if the page is still not
1189                                  * BM_VALID.  StartBufferIO does it all.
1190                                  */
1191                                 if (StartBufferIO(buf, true))
1192                                 {
1193                                         /*
1194                                          * If we get here, previous attempts to read the buffer
1195                                          * must have failed ... but we shall bravely try again.
1196                                          */
1197                                         *foundPtr = FALSE;
1198                                 }
1199                         }
1200
1201                         return buf;
1202                 }
1203
1204                 /*
1205                  * Need to lock the buffer header too in order to change its tag.
1206                  */
1207                 LockBufHdr(buf);
1208
1209                 /*
1210                  * Somebody could have pinned or re-dirtied the buffer while we were
1211                  * doing the I/O and making the new hashtable entry.  If so, we can't
1212                  * recycle this buffer; we must undo everything we've done and start
1213                  * over with a new victim buffer.
1214                  */
1215                 oldFlags = buf->flags;
1216                 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
1217                         break;
1218
1219                 UnlockBufHdr(buf);
1220                 BufTableDelete(&newTag, newHash);
1221                 if ((oldFlags & BM_TAG_VALID) &&
1222                         oldPartitionLock != newPartitionLock)
1223                         LWLockRelease(oldPartitionLock);
1224                 LWLockRelease(newPartitionLock);
1225                 UnpinBuffer(buf, true);
1226         }
1227
1228         /*
1229          * Okay, it's finally safe to rename the buffer.
1230          *
1231          * Clearing BM_VALID here is necessary, clearing the dirtybits is just
1232          * paranoia.  We also reset the usage_count since any recency of use of
1233          * the old content is no longer relevant.  (The usage_count starts out at
1234          * 1 so that the buffer can survive one clock-sweep pass.)
1235          */
1236         buf->tag = newTag;
1237         buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
1238         if (relpersistence == RELPERSISTENCE_PERMANENT)
1239                 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
1240         else
1241                 buf->flags |= BM_TAG_VALID;
1242         buf->usage_count = 1;
1243
1244         UnlockBufHdr(buf);
1245
1246         if (oldFlags & BM_TAG_VALID)
1247         {
1248                 BufTableDelete(&oldTag, oldHash);
1249                 if (oldPartitionLock != newPartitionLock)
1250                         LWLockRelease(oldPartitionLock);
1251         }
1252
1253         LWLockRelease(newPartitionLock);
1254
1255         /*
1256          * Buffer contents are currently invalid.  Try to get the io_in_progress
1257          * lock.  If StartBufferIO returns false, then someone else managed to
1258          * read it before we did, so there's nothing left for BufferAlloc() to do.
1259          */
1260         if (StartBufferIO(buf, true))
1261                 *foundPtr = FALSE;
1262         else
1263                 *foundPtr = TRUE;
1264
1265         return buf;
1266 }
1267
1268 /*
1269  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
1270  * freelist.
1271  *
1272  * The buffer header spinlock must be held at entry.  We drop it before
1273  * returning.  (This is sane because the caller must have locked the
1274  * buffer in order to be sure it should be dropped.)
1275  *
1276  * This is used only in contexts such as dropping a relation.  We assume
1277  * that no other backend could possibly be interested in using the page,
1278  * so the only reason the buffer might be pinned is if someone else is
1279  * trying to write it out.  We have to let them finish before we can
1280  * reclaim the buffer.
1281  *
1282  * The buffer could get reclaimed by someone else while we are waiting
1283  * to acquire the necessary locks; if so, don't mess it up.
1284  */
1285 static void
1286 InvalidateBuffer(BufferDesc *buf)
1287 {
1288         BufferTag       oldTag;
1289         uint32          oldHash;                /* hash value for oldTag */
1290         LWLock     *oldPartitionLock;           /* buffer partition lock for it */
1291         BufFlags        oldFlags;
1292
1293         /* Save the original buffer tag before dropping the spinlock */
1294         oldTag = buf->tag;
1295
1296         UnlockBufHdr(buf);
1297
1298         /*
1299          * Need to compute the old tag's hashcode and partition lock ID. XXX is it
1300          * worth storing the hashcode in BufferDesc so we need not recompute it
1301          * here?  Probably not.
1302          */
1303         oldHash = BufTableHashCode(&oldTag);
1304         oldPartitionLock = BufMappingPartitionLock(oldHash);
1305
1306 retry:
1307
1308         /*
1309          * Acquire exclusive mapping lock in preparation for changing the buffer's
1310          * association.
1311          */
1312         LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1313
1314         /* Re-lock the buffer header */
1315         LockBufHdr(buf);
1316
1317         /* If it's changed while we were waiting for lock, do nothing */
1318         if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
1319         {
1320                 UnlockBufHdr(buf);
1321                 LWLockRelease(oldPartitionLock);
1322                 return;
1323         }
1324
1325         /*
1326          * We assume the only reason for it to be pinned is that someone else is
1327          * flushing the page out.  Wait for them to finish.  (This could be an
1328          * infinite loop if the refcount is messed up... it would be nice to time
1329          * out after awhile, but there seems no way to be sure how many loops may
1330          * be needed.  Note that if the other guy has pinned the buffer but not
1331          * yet done StartBufferIO, WaitIO will fall through and we'll effectively
1332          * be busy-looping here.)
1333          */
1334         if (buf->refcount != 0)
1335         {
1336                 UnlockBufHdr(buf);
1337                 LWLockRelease(oldPartitionLock);
1338                 /* safety check: should definitely not be our *own* pin */
1339                 if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
1340                         elog(ERROR, "buffer is pinned in InvalidateBuffer");
1341                 WaitIO(buf);
1342                 goto retry;
1343         }
1344
1345         /*
1346          * Clear out the buffer's tag and flags.  We must do this to ensure that
1347          * linear scans of the buffer array don't think the buffer is valid.
1348          */
1349         oldFlags = buf->flags;
1350         CLEAR_BUFFERTAG(buf->tag);
1351         buf->flags = 0;
1352         buf->usage_count = 0;
1353
1354         UnlockBufHdr(buf);
1355
1356         /*
1357          * Remove the buffer from the lookup hashtable, if it was in there.
1358          */
1359         if (oldFlags & BM_TAG_VALID)
1360                 BufTableDelete(&oldTag, oldHash);
1361
1362         /*
1363          * Done with mapping lock.
1364          */
1365         LWLockRelease(oldPartitionLock);
1366
1367         /*
1368          * Insert the buffer at the head of the list of free buffers.
1369          */
1370         StrategyFreeBuffer(buf);
1371 }
1372
1373 /*
1374  * MarkBufferDirty
1375  *
1376  *              Marks buffer contents as dirty (actual write happens later).
1377  *
1378  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
1379  * exclusive lock, then somebody could be in process of writing the buffer,
1380  * leading to risk of bad data written to disk.)
1381  */
1382 void
1383 MarkBufferDirty(Buffer buffer)
1384 {
1385         BufferDesc *bufHdr;
1386
1387         if (!BufferIsValid(buffer))
1388                 elog(ERROR, "bad buffer ID: %d", buffer);
1389
1390         if (BufferIsLocal(buffer))
1391         {
1392                 MarkLocalBufferDirty(buffer);
1393                 return;
1394         }
1395
1396         bufHdr = GetBufferDescriptor(buffer - 1);
1397
1398         Assert(BufferIsPinned(buffer));
1399         /* unfortunately we can't check if the lock is held exclusively */
1400         Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
1401
1402         LockBufHdr(bufHdr);
1403
1404         Assert(bufHdr->refcount > 0);
1405
1406         /*
1407          * If the buffer was not dirty already, do vacuum accounting.
1408          */
1409         if (!(bufHdr->flags & BM_DIRTY))
1410         {
1411                 VacuumPageDirty++;
1412                 pgBufferUsage.shared_blks_dirtied++;
1413                 if (VacuumCostActive)
1414                         VacuumCostBalance += VacuumCostPageDirty;
1415         }
1416
1417         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1418
1419         UnlockBufHdr(bufHdr);
1420 }
1421
1422 /*
1423  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1424  *
1425  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1426  * compared to calling the two routines separately.  Now it's mainly just
1427  * a convenience function.  However, if the passed buffer is valid and
1428  * already contains the desired block, we just return it as-is; and that
1429  * does save considerable work compared to a full release and reacquire.
1430  *
1431  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1432  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1433  * but can save some tests in the caller.
1434  */
1435 Buffer
1436 ReleaseAndReadBuffer(Buffer buffer,
1437                                          Relation relation,
1438                                          BlockNumber blockNum)
1439 {
1440         ForkNumber      forkNum = MAIN_FORKNUM;
1441         BufferDesc *bufHdr;
1442
1443         if (BufferIsValid(buffer))
1444         {
1445                 Assert(BufferIsPinned(buffer));
1446                 if (BufferIsLocal(buffer))
1447                 {
1448                         bufHdr = GetLocalBufferDescriptor(-buffer - 1);
1449                         if (bufHdr->tag.blockNum == blockNum &&
1450                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1451                                 bufHdr->tag.forkNum == forkNum)
1452                                 return buffer;
1453                         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1454                         LocalRefCount[-buffer - 1]--;
1455                 }
1456                 else
1457                 {
1458                         bufHdr = GetBufferDescriptor(buffer - 1);
1459                         /* we have pin, so it's ok to examine tag without spinlock */
1460                         if (bufHdr->tag.blockNum == blockNum &&
1461                                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1462                                 bufHdr->tag.forkNum == forkNum)
1463                                 return buffer;
1464                         UnpinBuffer(bufHdr, true);
1465                 }
1466         }
1467
1468         return ReadBuffer(relation, blockNum);
1469 }
1470
1471 /*
1472  * PinBuffer -- make buffer unavailable for replacement.
1473  *
1474  * For the default access strategy, the buffer's usage_count is incremented
1475  * when we first pin it; for other strategies we just make sure the usage_count
1476  * isn't zero.  (The idea of the latter is that we don't want synchronized
1477  * heap scans to inflate the count, but we need it to not be zero to discourage
1478  * other backends from stealing buffers from our ring.  As long as we cycle
1479  * through the ring faster than the global clock-sweep cycles, buffers in
1480  * our ring won't be chosen as victims for replacement by other backends.)
1481  *
1482  * This should be applied only to shared buffers, never local ones.
1483  *
1484  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1485  *
1486  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
1487  * some callers to avoid an extra spinlock cycle.
1488  */
1489 static bool
1490 PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
1491 {
1492         Buffer          b = BufferDescriptorGetBuffer(buf);
1493         bool            result;
1494         PrivateRefCountEntry *ref;
1495
1496         ref = GetPrivateRefCountEntry(b, true);
1497
1498         if (ref == NULL)
1499         {
1500                 ReservePrivateRefCountEntry();
1501                 ref = NewPrivateRefCountEntry(b);
1502
1503                 LockBufHdr(buf);
1504                 buf->refcount++;
1505                 if (strategy == NULL)
1506                 {
1507                         if (buf->usage_count < BM_MAX_USAGE_COUNT)
1508                                 buf->usage_count++;
1509                 }
1510                 else
1511                 {
1512                         if (buf->usage_count == 0)
1513                                 buf->usage_count = 1;
1514                 }
1515                 result = (buf->flags & BM_VALID) != 0;
1516                 UnlockBufHdr(buf);
1517         }
1518         else
1519         {
1520                 /* If we previously pinned the buffer, it must surely be valid */
1521                 result = true;
1522         }
1523
1524         ref->refcount++;
1525         Assert(ref->refcount > 0);
1526         ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1527         return result;
1528 }
1529
1530 /*
1531  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1532  * The spinlock is released before return.
1533  *
1534  * As this function is called with the spinlock held, the caller has to
1535  * previously call ReservePrivateRefCountEntry().
1536  *
1537  * Currently, no callers of this function want to modify the buffer's
1538  * usage_count at all, so there's no need for a strategy parameter.
1539  * Also we don't bother with a BM_VALID test (the caller could check that for
1540  * itself).
1541  *
1542  * Also all callers only ever use this function when it's known that the
1543  * buffer can't have a preexisting pin by this backend. That allows us to skip
1544  * searching the private refcount array & hash, which is a boon, because the
1545  * spinlock is still held.
1546  *
1547  * Note: use of this routine is frequently mandatory, not just an optimization
1548  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1549  * its state can change under us.
1550  */
1551 static void
1552 PinBuffer_Locked(BufferDesc *buf)
1553 {
1554         Buffer          b;
1555         PrivateRefCountEntry *ref;
1556
1557         /*
1558          * As explained, We don't expect any preexisting pins. That allows us to
1559          * manipulate the PrivateRefCount after releasing the spinlock
1560          */
1561         Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
1562
1563         buf->refcount++;
1564         UnlockBufHdr(buf);
1565
1566         b = BufferDescriptorGetBuffer(buf);
1567
1568         ref = NewPrivateRefCountEntry(b);
1569         ref->refcount++;
1570
1571         ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1572 }
1573
1574 /*
1575  * UnpinBuffer -- make buffer available for replacement.
1576  *
1577  * This should be applied only to shared buffers, never local ones.
1578  *
1579  * Most but not all callers want CurrentResourceOwner to be adjusted.
1580  * Those that don't should pass fixOwner = FALSE.
1581  */
1582 static void
1583 UnpinBuffer(BufferDesc *buf, bool fixOwner)
1584 {
1585         PrivateRefCountEntry *ref;
1586         Buffer          b = BufferDescriptorGetBuffer(buf);
1587
1588         /* not moving as we're likely deleting it soon anyway */
1589         ref = GetPrivateRefCountEntry(b, false);
1590         Assert(ref != NULL);
1591
1592         if (fixOwner)
1593                 ResourceOwnerForgetBuffer(CurrentResourceOwner, b);
1594
1595         Assert(ref->refcount > 0);
1596         ref->refcount--;
1597         if (ref->refcount == 0)
1598         {
1599                 /* I'd better not still hold any locks on the buffer */
1600                 Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
1601                 Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
1602
1603                 LockBufHdr(buf);
1604
1605                 /* Decrement the shared reference count */
1606                 Assert(buf->refcount > 0);
1607                 buf->refcount--;
1608
1609                 /* Support LockBufferForCleanup() */
1610                 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
1611                         buf->refcount == 1)
1612                 {
1613                         /* we just released the last pin other than the waiter's */
1614                         int                     wait_backend_pid = buf->wait_backend_pid;
1615
1616                         buf->flags &= ~BM_PIN_COUNT_WAITER;
1617                         UnlockBufHdr(buf);
1618                         ProcSendSignal(wait_backend_pid);
1619                 }
1620                 else
1621                         UnlockBufHdr(buf);
1622
1623                 ForgetPrivateRefCountEntry(ref);
1624         }
1625 }
1626
1627 /*
1628  * BufferSync -- Write out all dirty buffers in the pool.
1629  *
1630  * This is called at checkpoint time to write out all dirty shared buffers.
1631  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1632  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
1633  * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
1634  * unlogged buffers, which are otherwise skipped.  The remaining flags
1635  * currently have no effect here.
1636  */
1637 static void
1638 BufferSync(int flags)
1639 {
1640         int                     buf_id;
1641         int                     num_to_scan;
1642         int                     num_to_write;
1643         int                     num_written;
1644         int                     mask = BM_DIRTY;
1645
1646         /* Make sure we can handle the pin inside SyncOneBuffer */
1647         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1648
1649         /*
1650          * Unless this is a shutdown checkpoint or we have been explicitly told,
1651          * we write only permanent, dirty buffers.  But at shutdown or end of
1652          * recovery, we write all dirty buffers.
1653          */
1654         if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
1655                                         CHECKPOINT_FLUSH_ALL))))
1656                 mask |= BM_PERMANENT;
1657
1658         /*
1659          * Loop over all buffers, and mark the ones that need to be written with
1660          * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
1661          * can estimate how much work needs to be done.
1662          *
1663          * This allows us to write only those pages that were dirty when the
1664          * checkpoint began, and not those that get dirtied while it proceeds.
1665          * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1666          * later in this function, or by normal backends or the bgwriter cleaning
1667          * scan, the flag is cleared.  Any buffer dirtied after this point won't
1668          * have the flag set.
1669          *
1670          * Note that if we fail to write some buffer, we may leave buffers with
1671          * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
1672          * certainly need to be written for the next checkpoint attempt, too.
1673          */
1674         num_to_write = 0;
1675         for (buf_id = 0; buf_id < NBuffers; buf_id++)
1676         {
1677                 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
1678
1679                 /*
1680                  * Header spinlock is enough to examine BM_DIRTY, see comment in
1681                  * SyncOneBuffer.
1682                  */
1683                 LockBufHdr(bufHdr);
1684
1685                 if ((bufHdr->flags & mask) == mask)
1686                 {
1687                         bufHdr->flags |= BM_CHECKPOINT_NEEDED;
1688                         num_to_write++;
1689                 }
1690
1691                 UnlockBufHdr(bufHdr);
1692         }
1693
1694         if (num_to_write == 0)
1695                 return;                                 /* nothing to do */
1696
1697         TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
1698
1699         /*
1700          * Loop over all buffers again, and write the ones (still) marked with
1701          * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
1702          * since we might as well dump soon-to-be-recycled buffers first.
1703          *
1704          * Note that we don't read the buffer alloc count here --- that should be
1705          * left untouched till the next BgBufferSync() call.
1706          */
1707         buf_id = StrategySyncStart(NULL, NULL);
1708         num_to_scan = NBuffers;
1709         num_written = 0;
1710         while (num_to_scan-- > 0)
1711         {
1712                 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
1713
1714                 /*
1715                  * We don't need to acquire the lock here, because we're only looking
1716                  * at a single bit. It's possible that someone else writes the buffer
1717                  * and clears the flag right after we check, but that doesn't matter
1718                  * since SyncOneBuffer will then do nothing.  However, there is a
1719                  * further race condition: it's conceivable that between the time we
1720                  * examine the bit here and the time SyncOneBuffer acquires lock,
1721                  * someone else not only wrote the buffer but replaced it with another
1722                  * page and dirtied it.  In that improbable case, SyncOneBuffer will
1723                  * write the buffer though we didn't need to.  It doesn't seem worth
1724                  * guarding against this, though.
1725                  */
1726                 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
1727                 {
1728                         if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
1729                         {
1730                                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1731                                 BgWriterStats.m_buf_written_checkpoints++;
1732                                 num_written++;
1733
1734                                 /*
1735                                  * We know there are at most num_to_write buffers with
1736                                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
1737                                  * num_written reaches num_to_write.
1738                                  *
1739                                  * Note that num_written doesn't include buffers written by
1740                                  * other backends, or by the bgwriter cleaning scan. That
1741                                  * means that the estimate of how much progress we've made is
1742                                  * conservative, and also that this test will often fail to
1743                                  * trigger.  But it seems worth making anyway.
1744                                  */
1745                                 if (num_written >= num_to_write)
1746                                         break;
1747
1748                                 /*
1749                                  * Sleep to throttle our I/O rate.
1750                                  */
1751                                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
1752                         }
1753                 }
1754
1755                 if (++buf_id >= NBuffers)
1756                         buf_id = 0;
1757         }
1758
1759         /*
1760          * Update checkpoint statistics. As noted above, this doesn't include
1761          * buffers written by other backends or bgwriter scan.
1762          */
1763         CheckpointStats.ckpt_bufs_written += num_written;
1764
1765         TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
1766 }
1767
1768 /*
1769  * BgBufferSync -- Write out some dirty buffers in the pool.
1770  *
1771  * This is called periodically by the background writer process.
1772  *
1773  * Returns true if it's appropriate for the bgwriter process to go into
1774  * low-power hibernation mode.  (This happens if the strategy clock sweep
1775  * has been "lapped" and no buffer allocations have occurred recently,
1776  * or if the bgwriter has been effectively disabled by setting
1777  * bgwriter_lru_maxpages to 0.)
1778  */
1779 bool
1780 BgBufferSync(void)
1781 {
1782         /* info obtained from freelist.c */
1783         int                     strategy_buf_id;
1784         uint32          strategy_passes;
1785         uint32          recent_alloc;
1786
1787         /*
1788          * Information saved between calls so we can determine the strategy
1789          * point's advance rate and avoid scanning already-cleaned buffers.
1790          */
1791         static bool saved_info_valid = false;
1792         static int      prev_strategy_buf_id;
1793         static uint32 prev_strategy_passes;
1794         static int      next_to_clean;
1795         static uint32 next_passes;
1796
1797         /* Moving averages of allocation rate and clean-buffer density */
1798         static float smoothed_alloc = 0;
1799         static float smoothed_density = 10.0;
1800
1801         /* Potentially these could be tunables, but for now, not */
1802         float           smoothing_samples = 16;
1803         float           scan_whole_pool_milliseconds = 120000.0;
1804
1805         /* Used to compute how far we scan ahead */
1806         long            strategy_delta;
1807         int                     bufs_to_lap;
1808         int                     bufs_ahead;
1809         float           scans_per_alloc;
1810         int                     reusable_buffers_est;
1811         int                     upcoming_alloc_est;
1812         int                     min_scan_buffers;
1813
1814         /* Variables for the scanning loop proper */
1815         int                     num_to_scan;
1816         int                     num_written;
1817         int                     reusable_buffers;
1818
1819         /* Variables for final smoothed_density update */
1820         long            new_strategy_delta;
1821         uint32          new_recent_alloc;
1822
1823         /*
1824          * Find out where the freelist clock sweep currently is, and how many
1825          * buffer allocations have happened since our last call.
1826          */
1827         strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
1828
1829         /* Report buffer alloc counts to pgstat */
1830         BgWriterStats.m_buf_alloc += recent_alloc;
1831
1832         /*
1833          * If we're not running the LRU scan, just stop after doing the stats
1834          * stuff.  We mark the saved state invalid so that we can recover sanely
1835          * if LRU scan is turned back on later.
1836          */
1837         if (bgwriter_lru_maxpages <= 0)
1838         {
1839                 saved_info_valid = false;
1840                 return true;
1841         }
1842
1843         /*
1844          * Compute strategy_delta = how many buffers have been scanned by the
1845          * clock sweep since last time.  If first time through, assume none. Then
1846          * see if we are still ahead of the clock sweep, and if so, how many
1847          * buffers we could scan before we'd catch up with it and "lap" it. Note:
1848          * weird-looking coding of xxx_passes comparisons are to avoid bogus
1849          * behavior when the passes counts wrap around.
1850          */
1851         if (saved_info_valid)
1852         {
1853                 int32           passes_delta = strategy_passes - prev_strategy_passes;
1854
1855                 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
1856                 strategy_delta += (long) passes_delta *NBuffers;
1857
1858                 Assert(strategy_delta >= 0);
1859
1860                 if ((int32) (next_passes - strategy_passes) > 0)
1861                 {
1862                         /* we're one pass ahead of the strategy point */
1863                         bufs_to_lap = strategy_buf_id - next_to_clean;
1864 #ifdef BGW_DEBUG
1865                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1866                                  next_passes, next_to_clean,
1867                                  strategy_passes, strategy_buf_id,
1868                                  strategy_delta, bufs_to_lap);
1869 #endif
1870                 }
1871                 else if (next_passes == strategy_passes &&
1872                                  next_to_clean >= strategy_buf_id)
1873                 {
1874                         /* on same pass, but ahead or at least not behind */
1875                         bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
1876 #ifdef BGW_DEBUG
1877                         elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
1878                                  next_passes, next_to_clean,
1879                                  strategy_passes, strategy_buf_id,
1880                                  strategy_delta, bufs_to_lap);
1881 #endif
1882                 }
1883                 else
1884                 {
1885                         /*
1886                          * We're behind, so skip forward to the strategy point and start
1887                          * cleaning from there.
1888                          */
1889 #ifdef BGW_DEBUG
1890                         elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
1891                                  next_passes, next_to_clean,
1892                                  strategy_passes, strategy_buf_id,
1893                                  strategy_delta);
1894 #endif
1895                         next_to_clean = strategy_buf_id;
1896                         next_passes = strategy_passes;
1897                         bufs_to_lap = NBuffers;
1898                 }
1899         }
1900         else
1901         {
1902                 /*
1903                  * Initializing at startup or after LRU scanning had been off. Always
1904                  * start at the strategy point.
1905                  */
1906 #ifdef BGW_DEBUG
1907                 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
1908                          strategy_passes, strategy_buf_id);
1909 #endif
1910                 strategy_delta = 0;
1911                 next_to_clean = strategy_buf_id;
1912                 next_passes = strategy_passes;
1913                 bufs_to_lap = NBuffers;
1914         }
1915
1916         /* Update saved info for next time */
1917         prev_strategy_buf_id = strategy_buf_id;
1918         prev_strategy_passes = strategy_passes;
1919         saved_info_valid = true;
1920
1921         /*
1922          * Compute how many buffers had to be scanned for each new allocation, ie,
1923          * 1/density of reusable buffers, and track a moving average of that.
1924          *
1925          * If the strategy point didn't move, we don't update the density estimate
1926          */
1927         if (strategy_delta > 0 && recent_alloc > 0)
1928         {
1929                 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
1930                 smoothed_density += (scans_per_alloc - smoothed_density) /
1931                         smoothing_samples;
1932         }
1933
1934         /*
1935          * Estimate how many reusable buffers there are between the current
1936          * strategy point and where we've scanned ahead to, based on the smoothed
1937          * density estimate.
1938          */
1939         bufs_ahead = NBuffers - bufs_to_lap;
1940         reusable_buffers_est = (float) bufs_ahead / smoothed_density;
1941
1942         /*
1943          * Track a moving average of recent buffer allocations.  Here, rather than
1944          * a true average we want a fast-attack, slow-decline behavior: we
1945          * immediately follow any increase.
1946          */
1947         if (smoothed_alloc <= (float) recent_alloc)
1948                 smoothed_alloc = recent_alloc;
1949         else
1950                 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
1951                         smoothing_samples;
1952
1953         /* Scale the estimate by a GUC to allow more aggressive tuning. */
1954         upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
1955
1956         /*
1957          * If recent_alloc remains at zero for many cycles, smoothed_alloc will
1958          * eventually underflow to zero, and the underflows produce annoying
1959          * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
1960          * zero, there's no point in tracking smaller and smaller values of
1961          * smoothed_alloc, so just reset it to exactly zero to avoid this
1962          * syndrome.  It will pop back up as soon as recent_alloc increases.
1963          */
1964         if (upcoming_alloc_est == 0)
1965                 smoothed_alloc = 0;
1966
1967         /*
1968          * Even in cases where there's been little or no buffer allocation
1969          * activity, we want to make a small amount of progress through the buffer
1970          * cache so that as many reusable buffers as possible are clean after an
1971          * idle period.
1972          *
1973          * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
1974          * the BGW will be called during the scan_whole_pool time; slice the
1975          * buffer pool into that many sections.
1976          */
1977         min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
1978
1979         if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
1980         {
1981 #ifdef BGW_DEBUG
1982                 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
1983                          upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
1984 #endif
1985                 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
1986         }
1987
1988         /*
1989          * Now write out dirty reusable buffers, working forward from the
1990          * next_to_clean point, until we have lapped the strategy scan, or cleaned
1991          * enough buffers to match our estimate of the next cycle's allocation
1992          * requirements, or hit the bgwriter_lru_maxpages limit.
1993          */
1994
1995         /* Make sure we can handle the pin inside SyncOneBuffer */
1996         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1997
1998         num_to_scan = bufs_to_lap;
1999         num_written = 0;
2000         reusable_buffers = reusable_buffers_est;
2001
2002         /* Execute the LRU scan */
2003         while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
2004         {
2005                 int                     buffer_state = SyncOneBuffer(next_to_clean, true);
2006
2007                 if (++next_to_clean >= NBuffers)
2008                 {
2009                         next_to_clean = 0;
2010                         next_passes++;
2011                 }
2012                 num_to_scan--;
2013
2014                 if (buffer_state & BUF_WRITTEN)
2015                 {
2016                         reusable_buffers++;
2017                         if (++num_written >= bgwriter_lru_maxpages)
2018                         {
2019                                 BgWriterStats.m_maxwritten_clean++;
2020                                 break;
2021                         }
2022                 }
2023                 else if (buffer_state & BUF_REUSABLE)
2024                         reusable_buffers++;
2025         }
2026
2027         BgWriterStats.m_buf_written_clean += num_written;
2028
2029 #ifdef BGW_DEBUG
2030         elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
2031                  recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
2032                  smoothed_density, reusable_buffers_est, upcoming_alloc_est,
2033                  bufs_to_lap - num_to_scan,
2034                  num_written,
2035                  reusable_buffers - reusable_buffers_est);
2036 #endif
2037
2038         /*
2039          * Consider the above scan as being like a new allocation scan.
2040          * Characterize its density and update the smoothed one based on it. This
2041          * effectively halves the moving average period in cases where both the
2042          * strategy and the background writer are doing some useful scanning,
2043          * which is helpful because a long memory isn't as desirable on the
2044          * density estimates.
2045          */
2046         new_strategy_delta = bufs_to_lap - num_to_scan;
2047         new_recent_alloc = reusable_buffers - reusable_buffers_est;
2048         if (new_strategy_delta > 0 && new_recent_alloc > 0)
2049         {
2050                 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
2051                 smoothed_density += (scans_per_alloc - smoothed_density) /
2052                         smoothing_samples;
2053
2054 #ifdef BGW_DEBUG
2055                 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
2056                          new_recent_alloc, new_strategy_delta,
2057                          scans_per_alloc, smoothed_density);
2058 #endif
2059         }
2060
2061         /* Return true if OK to hibernate */
2062         return (bufs_to_lap == 0 && recent_alloc == 0);
2063 }
2064
2065 /*
2066  * SyncOneBuffer -- process a single buffer during syncing.
2067  *
2068  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
2069  * buffers marked recently used, as these are not replacement candidates.
2070  *
2071  * Returns a bitmask containing the following flag bits:
2072  *      BUF_WRITTEN: we wrote the buffer.
2073  *      BUF_REUSABLE: buffer is available for replacement, ie, it has
2074  *              pin count 0 and usage count 0.
2075  *
2076  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
2077  * after locking it, but we don't care all that much.)
2078  *
2079  * Note: caller must have done ResourceOwnerEnlargeBuffers.
2080  */
2081 static int
2082 SyncOneBuffer(int buf_id, bool skip_recently_used)
2083 {
2084         BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
2085         int                     result = 0;
2086
2087         ReservePrivateRefCountEntry();
2088
2089         /*
2090          * Check whether buffer needs writing.
2091          *
2092          * We can make this check without taking the buffer content lock so long
2093          * as we mark pages dirty in access methods *before* logging changes with
2094          * XLogInsert(): if someone marks the buffer dirty just after our check we
2095          * don't worry because our checkpoint.redo points before log record for
2096          * upcoming changes and so we are not required to write such dirty buffer.
2097          */
2098         LockBufHdr(bufHdr);
2099
2100         if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
2101                 result |= BUF_REUSABLE;
2102         else if (skip_recently_used)
2103         {
2104                 /* Caller told us not to write recently-used buffers */
2105                 UnlockBufHdr(bufHdr);
2106                 return result;
2107         }
2108
2109         if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
2110         {
2111                 /* It's clean, so nothing to do */
2112                 UnlockBufHdr(bufHdr);
2113                 return result;
2114         }
2115
2116         /*
2117          * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
2118          * buffer is clean by the time we've locked it.)
2119          */
2120         PinBuffer_Locked(bufHdr);
2121         LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2122
2123         FlushBuffer(bufHdr, NULL);
2124
2125         LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2126         UnpinBuffer(bufHdr, true);
2127
2128         return result | BUF_WRITTEN;
2129 }
2130
2131 /*
2132  *              AtEOXact_Buffers - clean up at end of transaction.
2133  *
2134  *              As of PostgreSQL 8.0, buffer pins should get released by the
2135  *              ResourceOwner mechanism.  This routine is just a debugging
2136  *              cross-check that no pins remain.
2137  */
2138 void
2139 AtEOXact_Buffers(bool isCommit)
2140 {
2141         CheckForBufferLeaks();
2142
2143         AtEOXact_LocalBuffers(isCommit);
2144
2145         Assert(PrivateRefCountOverflowed == 0);
2146 }
2147
2148 /*
2149  * Initialize access to shared buffer pool
2150  *
2151  * This is called during backend startup (whether standalone or under the
2152  * postmaster).  It sets up for this backend's access to the already-existing
2153  * buffer pool.
2154  *
2155  * NB: this is called before InitProcess(), so we do not have a PGPROC and
2156  * cannot do LWLockAcquire; hence we can't actually access stuff in
2157  * shared memory yet.  We are only initializing local data here.
2158  * (See also InitBufferPoolBackend)
2159  */
2160 void
2161 InitBufferPoolAccess(void)
2162 {
2163         HASHCTL         hash_ctl;
2164
2165         memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
2166
2167         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2168         hash_ctl.keysize = sizeof(int32);
2169         hash_ctl.entrysize = sizeof(PrivateRefCountArray);
2170
2171         PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
2172                                                                           HASH_ELEM | HASH_BLOBS);
2173 }
2174
2175 /*
2176  * InitBufferPoolBackend --- second-stage initialization of a new backend
2177  *
2178  * This is called after we have acquired a PGPROC and so can safely get
2179  * LWLocks.  We don't currently need to do anything at this stage ...
2180  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
2181  * access, and thereby has to be called at the corresponding phase of
2182  * backend shutdown.
2183  */
2184 void
2185 InitBufferPoolBackend(void)
2186 {
2187         on_shmem_exit(AtProcExit_Buffers, 0);
2188 }
2189
2190 /*
2191  * During backend exit, ensure that we released all shared-buffer locks and
2192  * assert that we have no remaining pins.
2193  */
2194 static void
2195 AtProcExit_Buffers(int code, Datum arg)
2196 {
2197         AbortBufferIO();
2198         UnlockBuffers();
2199
2200         CheckForBufferLeaks();
2201
2202         /* localbuf.c needs a chance too */
2203         AtProcExit_LocalBuffers();
2204 }
2205
2206 /*
2207  *              CheckForBufferLeaks - ensure this backend holds no buffer pins
2208  *
2209  *              As of PostgreSQL 8.0, buffer pins should get released by the
2210  *              ResourceOwner mechanism.  This routine is just a debugging
2211  *              cross-check that no pins remain.
2212  */
2213 static void
2214 CheckForBufferLeaks(void)
2215 {
2216 #ifdef USE_ASSERT_CHECKING
2217         int                     RefCountErrors = 0;
2218         PrivateRefCountEntry *res;
2219         int                     i;
2220
2221         /* check the array */
2222         for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
2223         {
2224                 res = &PrivateRefCountArray[i];
2225
2226                 if (res->buffer != InvalidBuffer)
2227                 {
2228                         PrintBufferLeakWarning(res->buffer);
2229                         RefCountErrors++;
2230                 }
2231         }
2232
2233         /* if necessary search the hash */
2234         if (PrivateRefCountOverflowed)
2235         {
2236                 HASH_SEQ_STATUS hstat;
2237
2238                 hash_seq_init(&hstat, PrivateRefCountHash);
2239                 while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
2240                 {
2241                         PrintBufferLeakWarning(res->buffer);
2242                         RefCountErrors++;
2243                 }
2244
2245         }
2246
2247         Assert(RefCountErrors == 0);
2248 #endif
2249 }
2250
2251 /*
2252  * Helper routine to issue warnings when a buffer is unexpectedly pinned
2253  */
2254 void
2255 PrintBufferLeakWarning(Buffer buffer)
2256 {
2257         BufferDesc *buf;
2258         int32           loccount;
2259         char       *path;
2260         BackendId       backend;
2261
2262         Assert(BufferIsValid(buffer));
2263         if (BufferIsLocal(buffer))
2264         {
2265                 buf = GetLocalBufferDescriptor(-buffer - 1);
2266                 loccount = LocalRefCount[-buffer - 1];
2267                 backend = MyBackendId;
2268         }
2269         else
2270         {
2271                 buf = GetBufferDescriptor(buffer - 1);
2272                 loccount = GetPrivateRefCount(buffer);
2273                 backend = InvalidBackendId;
2274         }
2275
2276         /* theoretically we should lock the bufhdr here */
2277         path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
2278         elog(WARNING,
2279                  "buffer refcount leak: [%03d] "
2280                  "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
2281                  buffer, path,
2282                  buf->tag.blockNum, buf->flags,
2283                  buf->refcount, loccount);
2284         pfree(path);
2285 }
2286
2287 /*
2288  * CheckPointBuffers
2289  *
2290  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
2291  *
2292  * Note: temporary relations do not participate in checkpoints, so they don't
2293  * need to be flushed.
2294  */
2295 void
2296 CheckPointBuffers(int flags)
2297 {
2298         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
2299         CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
2300         BufferSync(flags);
2301         CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
2302         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
2303         smgrsync();
2304         CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
2305         TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
2306 }
2307
2308
2309 /*
2310  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
2311  */
2312 void
2313 BufmgrCommit(void)
2314 {
2315         /* Nothing to do in bufmgr anymore... */
2316 }
2317
2318 /*
2319  * BufferGetBlockNumber
2320  *              Returns the block number associated with a buffer.
2321  *
2322  * Note:
2323  *              Assumes that the buffer is valid and pinned, else the
2324  *              value may be obsolete immediately...
2325  */
2326 BlockNumber
2327 BufferGetBlockNumber(Buffer buffer)
2328 {
2329         BufferDesc *bufHdr;
2330
2331         Assert(BufferIsPinned(buffer));
2332
2333         if (BufferIsLocal(buffer))
2334                 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2335         else
2336                 bufHdr = GetBufferDescriptor(buffer - 1);
2337
2338         /* pinned, so OK to read tag without spinlock */
2339         return bufHdr->tag.blockNum;
2340 }
2341
2342 /*
2343  * BufferGetTag
2344  *              Returns the relfilenode, fork number and block number associated with
2345  *              a buffer.
2346  */
2347 void
2348 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
2349                          BlockNumber *blknum)
2350 {
2351         BufferDesc *bufHdr;
2352
2353         /* Do the same checks as BufferGetBlockNumber. */
2354         Assert(BufferIsPinned(buffer));
2355
2356         if (BufferIsLocal(buffer))
2357                 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2358         else
2359                 bufHdr = GetBufferDescriptor(buffer - 1);
2360
2361         /* pinned, so OK to read tag without spinlock */
2362         *rnode = bufHdr->tag.rnode;
2363         *forknum = bufHdr->tag.forkNum;
2364         *blknum = bufHdr->tag.blockNum;
2365 }
2366
2367 /*
2368  * FlushBuffer
2369  *              Physically write out a shared buffer.
2370  *
2371  * NOTE: this actually just passes the buffer contents to the kernel; the
2372  * real write to disk won't happen until the kernel feels like it.  This
2373  * is okay from our point of view since we can redo the changes from WAL.
2374  * However, we will need to force the changes to disk via fsync before
2375  * we can checkpoint WAL.
2376  *
2377  * The caller must hold a pin on the buffer and have share-locked the
2378  * buffer contents.  (Note: a share-lock does not prevent updates of
2379  * hint bits in the buffer, so the page could change while the write
2380  * is in progress, but we assume that that will not invalidate the data
2381  * written.)
2382  *
2383  * If the caller has an smgr reference for the buffer's relation, pass it
2384  * as the second parameter.  If not, pass NULL.
2385  */
2386 static void
2387 FlushBuffer(BufferDesc *buf, SMgrRelation reln)
2388 {
2389         XLogRecPtr      recptr;
2390         ErrorContextCallback errcallback;
2391         instr_time      io_start,
2392                                 io_time;
2393         Block           bufBlock;
2394         char       *bufToWrite;
2395
2396         /*
2397          * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
2398          * false, then someone else flushed the buffer before we could, so we need
2399          * not do anything.
2400          */
2401         if (!StartBufferIO(buf, false))
2402                 return;
2403
2404         /* Setup error traceback support for ereport() */
2405         errcallback.callback = shared_buffer_write_error_callback;
2406         errcallback.arg = (void *) buf;
2407         errcallback.previous = error_context_stack;
2408         error_context_stack = &errcallback;
2409
2410         /* Find smgr relation for buffer */
2411         if (reln == NULL)
2412                 reln = smgropen(buf->tag.rnode, InvalidBackendId);
2413
2414         TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
2415                                                                                 buf->tag.blockNum,
2416                                                                                 reln->smgr_rnode.node.spcNode,
2417                                                                                 reln->smgr_rnode.node.dbNode,
2418                                                                                 reln->smgr_rnode.node.relNode);
2419
2420         LockBufHdr(buf);
2421
2422         /*
2423          * Run PageGetLSN while holding header lock, since we don't have the
2424          * buffer locked exclusively in all cases.
2425          */
2426         recptr = BufferGetLSN(buf);
2427
2428         /* To check if block content changes while flushing. - vadim 01/17/97 */
2429         buf->flags &= ~BM_JUST_DIRTIED;
2430         UnlockBufHdr(buf);
2431
2432         /*
2433          * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
2434          * rule that log updates must hit disk before any of the data-file changes
2435          * they describe do.
2436          *
2437          * However, this rule does not apply to unlogged relations, which will be
2438          * lost after a crash anyway.  Most unlogged relation pages do not bear
2439          * LSNs since we never emit WAL records for them, and therefore flushing
2440          * up through the buffer LSN would be useless, but harmless.  However,
2441          * GiST indexes use LSNs internally to track page-splits, and therefore
2442          * unlogged GiST pages bear "fake" LSNs generated by
2443          * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
2444          * LSN counter could advance past the WAL insertion point; and if it did
2445          * happen, attempting to flush WAL through that location would fail, with
2446          * disastrous system-wide consequences.  To make sure that can't happen,
2447          * skip the flush if the buffer isn't permanent.
2448          */
2449         if (buf->flags & BM_PERMANENT)
2450                 XLogFlush(recptr);
2451
2452         /*
2453          * Now it's safe to write buffer to disk. Note that no one else should
2454          * have been able to write it while we were busy with log flushing because
2455          * we have the io_in_progress lock.
2456          */
2457         bufBlock = BufHdrGetBlock(buf);
2458
2459         /*
2460          * Update page checksum if desired.  Since we have only shared lock on the
2461          * buffer, other processes might be updating hint bits in it, so we must
2462          * copy the page to private storage if we do checksumming.
2463          */
2464         bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
2465
2466         if (track_io_timing)
2467                 INSTR_TIME_SET_CURRENT(io_start);
2468
2469         /*
2470          * bufToWrite is either the shared buffer or a copy, as appropriate.
2471          */
2472         smgrwrite(reln,
2473                           buf->tag.forkNum,
2474                           buf->tag.blockNum,
2475                           bufToWrite,
2476                           false);
2477
2478         if (track_io_timing)
2479         {
2480                 INSTR_TIME_SET_CURRENT(io_time);
2481                 INSTR_TIME_SUBTRACT(io_time, io_start);
2482                 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2483                 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2484         }
2485
2486         pgBufferUsage.shared_blks_written++;
2487
2488         /*
2489          * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2490          * end the io_in_progress state.
2491          */
2492         TerminateBufferIO(buf, true, 0);
2493
2494         TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2495                                                                            buf->tag.blockNum,
2496                                                                            reln->smgr_rnode.node.spcNode,
2497                                                                            reln->smgr_rnode.node.dbNode,
2498                                                                            reln->smgr_rnode.node.relNode);
2499
2500         /* Pop the error context stack */
2501         error_context_stack = errcallback.previous;
2502 }
2503
2504 /*
2505  * RelationGetNumberOfBlocksInFork
2506  *              Determines the current number of pages in the specified relation fork.
2507  */
2508 BlockNumber
2509 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2510 {
2511         /* Open it at the smgr level if not already done */
2512         RelationOpenSmgr(relation);
2513
2514         return smgrnblocks(relation->rd_smgr, forkNum);
2515 }
2516
2517 /*
2518  * BufferIsPermanent
2519  *              Determines whether a buffer will potentially still be around after
2520  *              a crash.  Caller must hold a buffer pin.
2521  */
2522 bool
2523 BufferIsPermanent(Buffer buffer)
2524 {
2525         BufferDesc *bufHdr;
2526
2527         /* Local buffers are used only for temp relations. */
2528         if (BufferIsLocal(buffer))
2529                 return false;
2530
2531         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2532         Assert(BufferIsValid(buffer));
2533         Assert(BufferIsPinned(buffer));
2534
2535         /*
2536          * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2537          * need not bother with the buffer header spinlock.  Even if someone else
2538          * changes the buffer header flags while we're doing this, we assume that
2539          * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
2540          * old value or the new value, but not random garbage.
2541          */
2542         bufHdr = GetBufferDescriptor(buffer - 1);
2543         return (bufHdr->flags & BM_PERMANENT) != 0;
2544 }
2545
2546 /*
2547  * BufferGetLSNAtomic
2548  *              Retrieves the LSN of the buffer atomically using a buffer header lock.
2549  *              This is necessary for some callers who may not have an exclusive lock
2550  *              on the buffer.
2551  */
2552 XLogRecPtr
2553 BufferGetLSNAtomic(Buffer buffer)
2554 {
2555         BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
2556         char       *page = BufferGetPage(buffer);
2557         XLogRecPtr      lsn;
2558
2559         /*
2560          * If we don't need locking for correctness, fastpath out.
2561          */
2562         if (!XLogHintBitIsNeeded() || BufferIsLocal(buffer))
2563                 return PageGetLSN(page);
2564
2565         /* Make sure we've got a real buffer, and that we hold a pin on it. */
2566         Assert(BufferIsValid(buffer));
2567         Assert(BufferIsPinned(buffer));
2568
2569         LockBufHdr(bufHdr);
2570         lsn = PageGetLSN(page);
2571         UnlockBufHdr(bufHdr);
2572
2573         return lsn;
2574 }
2575
2576 /* ---------------------------------------------------------------------
2577  *              DropRelFileNodeBuffers
2578  *
2579  *              This function removes from the buffer pool all the pages of the
2580  *              specified relation fork that have block numbers >= firstDelBlock.
2581  *              (In particular, with firstDelBlock = 0, all pages are removed.)
2582  *              Dirty pages are simply dropped, without bothering to write them
2583  *              out first.  Therefore, this is NOT rollback-able, and so should be
2584  *              used only with extreme caution!
2585  *
2586  *              Currently, this is called only from smgr.c when the underlying file
2587  *              is about to be deleted or truncated (firstDelBlock is needed for
2588  *              the truncation case).  The data in the affected pages would therefore
2589  *              be deleted momentarily anyway, and there is no point in writing it.
2590  *              It is the responsibility of higher-level code to ensure that the
2591  *              deletion or truncation does not lose any data that could be needed
2592  *              later.  It is also the responsibility of higher-level code to ensure
2593  *              that no other process could be trying to load more pages of the
2594  *              relation into buffers.
2595  *
2596  *              XXX currently it sequentially searches the buffer pool, should be
2597  *              changed to more clever ways of searching.  However, this routine
2598  *              is used only in code paths that aren't very performance-critical,
2599  *              and we shouldn't slow down the hot paths to make it faster ...
2600  * --------------------------------------------------------------------
2601  */
2602 void
2603 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2604                                            BlockNumber firstDelBlock)
2605 {
2606         int                     i;
2607
2608         /* If it's a local relation, it's localbuf.c's problem. */
2609         if (RelFileNodeBackendIsTemp(rnode))
2610         {
2611                 if (rnode.backend == MyBackendId)
2612                         DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2613                 return;
2614         }
2615
2616         for (i = 0; i < NBuffers; i++)
2617         {
2618                 BufferDesc *bufHdr = GetBufferDescriptor(i);
2619
2620                 /*
2621                  * We can make this a tad faster by prechecking the buffer tag before
2622                  * we attempt to lock the buffer; this saves a lot of lock
2623                  * acquisitions in typical cases.  It should be safe because the
2624                  * caller must have AccessExclusiveLock on the relation, or some other
2625                  * reason to be certain that no one is loading new pages of the rel
2626                  * into the buffer pool.  (Otherwise we might well miss such pages
2627                  * entirely.)  Therefore, while the tag might be changing while we
2628                  * look at it, it can't be changing *to* a value we care about, only
2629                  * *away* from such a value.  So false negatives are impossible, and
2630                  * false positives are safe because we'll recheck after getting the
2631                  * buffer lock.
2632                  *
2633                  * We could check forkNum and blockNum as well as the rnode, but the
2634                  * incremental win from doing so seems small.
2635                  */
2636                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2637                         continue;
2638
2639                 LockBufHdr(bufHdr);
2640                 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2641                         bufHdr->tag.forkNum == forkNum &&
2642                         bufHdr->tag.blockNum >= firstDelBlock)
2643                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2644                 else
2645                         UnlockBufHdr(bufHdr);
2646         }
2647 }
2648
2649 /* ---------------------------------------------------------------------
2650  *              DropRelFileNodesAllBuffers
2651  *
2652  *              This function removes from the buffer pool all the pages of all
2653  *              forks of the specified relations.  It's equivalent to calling
2654  *              DropRelFileNodeBuffers once per fork per relation with
2655  *              firstDelBlock = 0.
2656  * --------------------------------------------------------------------
2657  */
2658 void
2659 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2660 {
2661         int                     i,
2662                                 n = 0;
2663         RelFileNode *nodes;
2664         bool            use_bsearch;
2665
2666         if (nnodes == 0)
2667                 return;
2668
2669         nodes = palloc(sizeof(RelFileNode) * nnodes);           /* non-local relations */
2670
2671         /* If it's a local relation, it's localbuf.c's problem. */
2672         for (i = 0; i < nnodes; i++)
2673         {
2674                 if (RelFileNodeBackendIsTemp(rnodes[i]))
2675                 {
2676                         if (rnodes[i].backend == MyBackendId)
2677                                 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2678                 }
2679                 else
2680                         nodes[n++] = rnodes[i].node;
2681         }
2682
2683         /*
2684          * If there are no non-local relations, then we're done. Release the
2685          * memory and return.
2686          */
2687         if (n == 0)
2688         {
2689                 pfree(nodes);
2690                 return;
2691         }
2692
2693         /*
2694          * For low number of relations to drop just use a simple walk through, to
2695          * save the bsearch overhead. The threshold to use is rather a guess than
2696          * an exactly determined value, as it depends on many factors (CPU and RAM
2697          * speeds, amount of shared buffers etc.).
2698          */
2699         use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2700
2701         /* sort the list of rnodes if necessary */
2702         if (use_bsearch)
2703                 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2704
2705         for (i = 0; i < NBuffers; i++)
2706         {
2707                 RelFileNode *rnode = NULL;
2708                 BufferDesc *bufHdr = GetBufferDescriptor(i);
2709
2710                 /*
2711                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2712                  * and saves some cycles.
2713                  */
2714
2715                 if (!use_bsearch)
2716                 {
2717                         int                     j;
2718
2719                         for (j = 0; j < n; j++)
2720                         {
2721                                 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
2722                                 {
2723                                         rnode = &nodes[j];
2724                                         break;
2725                                 }
2726                         }
2727                 }
2728                 else
2729                 {
2730                         rnode = bsearch((const void *) &(bufHdr->tag.rnode),
2731                                                         nodes, n, sizeof(RelFileNode),
2732                                                         rnode_comparator);
2733                 }
2734
2735                 /* buffer doesn't belong to any of the given relfilenodes; skip it */
2736                 if (rnode == NULL)
2737                         continue;
2738
2739                 LockBufHdr(bufHdr);
2740                 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
2741                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2742                 else
2743                         UnlockBufHdr(bufHdr);
2744         }
2745
2746         pfree(nodes);
2747 }
2748
2749 /* ---------------------------------------------------------------------
2750  *              DropDatabaseBuffers
2751  *
2752  *              This function removes all the buffers in the buffer cache for a
2753  *              particular database.  Dirty pages are simply dropped, without
2754  *              bothering to write them out first.  This is used when we destroy a
2755  *              database, to avoid trying to flush data to disk when the directory
2756  *              tree no longer exists.  Implementation is pretty similar to
2757  *              DropRelFileNodeBuffers() which is for destroying just one relation.
2758  * --------------------------------------------------------------------
2759  */
2760 void
2761 DropDatabaseBuffers(Oid dbid)
2762 {
2763         int                     i;
2764
2765         /*
2766          * We needn't consider local buffers, since by assumption the target
2767          * database isn't our own.
2768          */
2769
2770         for (i = 0; i < NBuffers; i++)
2771         {
2772                 BufferDesc *bufHdr = GetBufferDescriptor(i);
2773
2774                 /*
2775                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2776                  * and saves some cycles.
2777                  */
2778                 if (bufHdr->tag.rnode.dbNode != dbid)
2779                         continue;
2780
2781                 LockBufHdr(bufHdr);
2782                 if (bufHdr->tag.rnode.dbNode == dbid)
2783                         InvalidateBuffer(bufHdr);       /* releases spinlock */
2784                 else
2785                         UnlockBufHdr(bufHdr);
2786         }
2787 }
2788
2789 /* -----------------------------------------------------------------
2790  *              PrintBufferDescs
2791  *
2792  *              this function prints all the buffer descriptors, for debugging
2793  *              use only.
2794  * -----------------------------------------------------------------
2795  */
2796 #ifdef NOT_USED
2797 void
2798 PrintBufferDescs(void)
2799 {
2800         int                     i;
2801
2802         for (i = 0; i < NBuffers; ++i)
2803         {
2804                 BufferDesc *buf = GetBufferDescriptor(i);
2805                 Buffer          b = BufferDescriptorGetBuffer(buf);
2806
2807                 /* theoretically we should lock the bufhdr here */
2808                 elog(LOG,
2809                          "[%02d] (freeNext=%d, rel=%s, "
2810                          "blockNum=%u, flags=0x%x, refcount=%u %d)",
2811                          i, buf->freeNext,
2812                   relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
2813                          buf->tag.blockNum, buf->flags,
2814                          buf->refcount, GetPrivateRefCount(b));
2815         }
2816 }
2817 #endif
2818
2819 #ifdef NOT_USED
2820 void
2821 PrintPinnedBufs(void)
2822 {
2823         int                     i;
2824
2825         for (i = 0; i < NBuffers; ++i)
2826         {
2827                 BufferDesc *buf = GetBufferDescriptor(i);
2828                 Buffer          b = BufferDescriptorGetBuffer(buf);
2829
2830                 if (GetPrivateRefCount(b) > 0)
2831                 {
2832                         /* theoretically we should lock the bufhdr here */
2833                         elog(LOG,
2834                                  "[%02d] (freeNext=%d, rel=%s, "
2835                                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
2836                                  i, buf->freeNext,
2837                                  relpathperm(buf->tag.rnode, buf->tag.forkNum),
2838                                  buf->tag.blockNum, buf->flags,
2839                                  buf->refcount, GetPrivateRefCount(b));
2840                 }
2841         }
2842 }
2843 #endif
2844
2845 /* ---------------------------------------------------------------------
2846  *              FlushRelationBuffers
2847  *
2848  *              This function writes all dirty pages of a relation out to disk
2849  *              (or more accurately, out to kernel disk buffers), ensuring that the
2850  *              kernel has an up-to-date view of the relation.
2851  *
2852  *              Generally, the caller should be holding AccessExclusiveLock on the
2853  *              target relation to ensure that no other backend is busy dirtying
2854  *              more blocks of the relation; the effects can't be expected to last
2855  *              after the lock is released.
2856  *
2857  *              XXX currently it sequentially searches the buffer pool, should be
2858  *              changed to more clever ways of searching.  This routine is not
2859  *              used in any performance-critical code paths, so it's not worth
2860  *              adding additional overhead to normal paths to make it go faster;
2861  *              but see also DropRelFileNodeBuffers.
2862  * --------------------------------------------------------------------
2863  */
2864 void
2865 FlushRelationBuffers(Relation rel)
2866 {
2867         int                     i;
2868         BufferDesc *bufHdr;
2869
2870         /* Open rel at the smgr level if not already done */
2871         RelationOpenSmgr(rel);
2872
2873         if (RelationUsesLocalBuffers(rel))
2874         {
2875                 for (i = 0; i < NLocBuffer; i++)
2876                 {
2877                         bufHdr = GetLocalBufferDescriptor(i);
2878                         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2879                                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2880                         {
2881                                 ErrorContextCallback errcallback;
2882                                 Page            localpage;
2883
2884                                 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
2885
2886                                 /* Setup error traceback support for ereport() */
2887                                 errcallback.callback = local_buffer_write_error_callback;
2888                                 errcallback.arg = (void *) bufHdr;
2889                                 errcallback.previous = error_context_stack;
2890                                 error_context_stack = &errcallback;
2891
2892                                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
2893
2894                                 smgrwrite(rel->rd_smgr,
2895                                                   bufHdr->tag.forkNum,
2896                                                   bufHdr->tag.blockNum,
2897                                                   localpage,
2898                                                   false);
2899
2900                                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
2901
2902                                 /* Pop the error context stack */
2903                                 error_context_stack = errcallback.previous;
2904                         }
2905                 }
2906
2907                 return;
2908         }
2909
2910         /* Make sure we can handle the pin inside the loop */
2911         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2912
2913         for (i = 0; i < NBuffers; i++)
2914         {
2915                 bufHdr = GetBufferDescriptor(i);
2916
2917                 /*
2918                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2919                  * and saves some cycles.
2920                  */
2921                 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
2922                         continue;
2923
2924                 ReservePrivateRefCountEntry();
2925
2926                 LockBufHdr(bufHdr);
2927                 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2928                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2929                 {
2930                         PinBuffer_Locked(bufHdr);
2931                         LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2932                         FlushBuffer(bufHdr, rel->rd_smgr);
2933                         LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2934                         UnpinBuffer(bufHdr, true);
2935                 }
2936                 else
2937                         UnlockBufHdr(bufHdr);
2938         }
2939 }
2940
2941 /* ---------------------------------------------------------------------
2942  *              FlushDatabaseBuffers
2943  *
2944  *              This function writes all dirty pages of a database out to disk
2945  *              (or more accurately, out to kernel disk buffers), ensuring that the
2946  *              kernel has an up-to-date view of the database.
2947  *
2948  *              Generally, the caller should be holding an appropriate lock to ensure
2949  *              no other backend is active in the target database; otherwise more
2950  *              pages could get dirtied.
2951  *
2952  *              Note we don't worry about flushing any pages of temporary relations.
2953  *              It's assumed these wouldn't be interesting.
2954  * --------------------------------------------------------------------
2955  */
2956 void
2957 FlushDatabaseBuffers(Oid dbid)
2958 {
2959         int                     i;
2960         BufferDesc *bufHdr;
2961
2962         /* Make sure we can handle the pin inside the loop */
2963         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2964
2965         for (i = 0; i < NBuffers; i++)
2966         {
2967                 bufHdr = GetBufferDescriptor(i);
2968
2969                 /*
2970                  * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
2971                  * and saves some cycles.
2972                  */
2973                 if (bufHdr->tag.rnode.dbNode != dbid)
2974                         continue;
2975
2976                 ReservePrivateRefCountEntry();
2977
2978                 LockBufHdr(bufHdr);
2979                 if (bufHdr->tag.rnode.dbNode == dbid &&
2980                         (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2981                 {
2982                         PinBuffer_Locked(bufHdr);
2983                         LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2984                         FlushBuffer(bufHdr, NULL);
2985                         LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2986                         UnpinBuffer(bufHdr, true);
2987                 }
2988                 else
2989                         UnlockBufHdr(bufHdr);
2990         }
2991 }
2992
2993 /*
2994  * Flush a previously, shared or exclusively, locked and pinned buffer to the
2995  * OS.
2996  */
2997 void
2998 FlushOneBuffer(Buffer buffer)
2999 {
3000         BufferDesc *bufHdr;
3001
3002         /* currently not needed, but no fundamental reason not to support */
3003         Assert(!BufferIsLocal(buffer));
3004
3005         Assert(BufferIsPinned(buffer));
3006
3007         bufHdr = GetBufferDescriptor(buffer - 1);
3008
3009         Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3010
3011         FlushBuffer(bufHdr, NULL);
3012 }
3013
3014 /*
3015  * ReleaseBuffer -- release the pin on a buffer
3016  */
3017 void
3018 ReleaseBuffer(Buffer buffer)
3019 {
3020         if (!BufferIsValid(buffer))
3021                 elog(ERROR, "bad buffer ID: %d", buffer);
3022
3023         if (BufferIsLocal(buffer))
3024         {
3025                 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
3026
3027                 Assert(LocalRefCount[-buffer - 1] > 0);
3028                 LocalRefCount[-buffer - 1]--;
3029                 return;
3030         }
3031
3032         UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
3033 }
3034
3035 /*
3036  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
3037  *
3038  * This is just a shorthand for a common combination.
3039  */
3040 void
3041 UnlockReleaseBuffer(Buffer buffer)
3042 {
3043         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3044         ReleaseBuffer(buffer);
3045 }
3046
3047 /*
3048  * IncrBufferRefCount
3049  *              Increment the pin count on a buffer that we have *already* pinned
3050  *              at least once.
3051  *
3052  *              This function cannot be used on a buffer we do not have pinned,
3053  *              because it doesn't change the shared buffer state.
3054  */
3055 void
3056 IncrBufferRefCount(Buffer buffer)
3057 {
3058         Assert(BufferIsPinned(buffer));
3059         ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3060         ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
3061         if (BufferIsLocal(buffer))
3062                 LocalRefCount[-buffer - 1]++;
3063         else
3064         {
3065                 PrivateRefCountEntry *ref;
3066
3067                 ref = GetPrivateRefCountEntry(buffer, true);
3068                 Assert(ref != NULL);
3069                 ref->refcount++;
3070         }
3071 }
3072
3073 /*
3074  * MarkBufferDirtyHint
3075  *
3076  *      Mark a buffer dirty for non-critical changes.
3077  *
3078  * This is essentially the same as MarkBufferDirty, except:
3079  *
3080  * 1. The caller does not write WAL; so if checksums are enabled, we may need
3081  *        to write an XLOG_HINT WAL record to protect against torn pages.
3082  * 2. The caller might have only share-lock instead of exclusive-lock on the
3083  *        buffer's content lock.
3084  * 3. This function does not guarantee that the buffer is always marked dirty
3085  *        (due to a race condition), so it cannot be used for important changes.
3086  */
3087 void
3088 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
3089 {
3090         BufferDesc *bufHdr;
3091         Page            page = BufferGetPage(buffer);
3092
3093         if (!BufferIsValid(buffer))
3094                 elog(ERROR, "bad buffer ID: %d", buffer);
3095
3096         if (BufferIsLocal(buffer))
3097         {
3098                 MarkLocalBufferDirty(buffer);
3099                 return;
3100         }
3101
3102         bufHdr = GetBufferDescriptor(buffer - 1);
3103
3104         Assert(GetPrivateRefCount(buffer) > 0);
3105         /* here, either share or exclusive lock is OK */
3106         Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3107
3108         /*
3109          * This routine might get called many times on the same page, if we are
3110          * making the first scan after commit of an xact that added/deleted many
3111          * tuples. So, be as quick as we can if the buffer is already dirty.  We
3112          * do this by not acquiring spinlock if it looks like the status bits are
3113          * already set.  Since we make this test unlocked, there's a chance we
3114          * might fail to notice that the flags have just been cleared, and failed
3115          * to reset them, due to memory-ordering issues.  But since this function
3116          * is only intended to be used in cases where failing to write out the
3117          * data would be harmless anyway, it doesn't really matter.
3118          */
3119         if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
3120                 (BM_DIRTY | BM_JUST_DIRTIED))
3121         {
3122                 XLogRecPtr      lsn = InvalidXLogRecPtr;
3123                 bool            dirtied = false;
3124                 bool            delayChkpt = false;
3125
3126                 /*
3127                  * If we need to protect hint bit updates from torn writes, WAL-log a
3128                  * full page image of the page. This full page image is only necessary
3129                  * if the hint bit update is the first change to the page since the
3130                  * last checkpoint.
3131                  *
3132                  * We don't check full_page_writes here because that logic is included
3133                  * when we call XLogInsert() since the value changes dynamically.
3134                  */
3135                 if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
3136                 {
3137                         /*
3138                          * If we're in recovery we cannot dirty a page because of a hint.
3139                          * We can set the hint, just not dirty the page as a result so the
3140                          * hint is lost when we evict the page or shutdown.
3141                          *
3142                          * See src/backend/storage/page/README for longer discussion.
3143                          */
3144                         if (RecoveryInProgress())
3145                                 return;
3146
3147                         /*
3148                          * If the block is already dirty because we either made a change
3149                          * or set a hint already, then we don't need to write a full page
3150                          * image.  Note that aggressive cleaning of blocks dirtied by hint
3151                          * bit setting would increase the call rate. Bulk setting of hint
3152                          * bits would reduce the call rate...
3153                          *
3154                          * We must issue the WAL record before we mark the buffer dirty.
3155                          * Otherwise we might write the page before we write the WAL. That
3156                          * causes a race condition, since a checkpoint might occur between
3157                          * writing the WAL record and marking the buffer dirty. We solve
3158                          * that with a kluge, but one that is already in use during
3159                          * transaction commit to prevent race conditions. Basically, we
3160                          * simply prevent the checkpoint WAL record from being written
3161                          * until we have marked the buffer dirty. We don't start the
3162                          * checkpoint flush until we have marked dirty, so our checkpoint
3163                          * must flush the change to disk successfully or the checkpoint
3164                          * never gets written, so crash recovery will fix.
3165                          *
3166                          * It's possible we may enter here without an xid, so it is
3167                          * essential that CreateCheckpoint waits for virtual transactions
3168                          * rather than full transactionids.
3169                          */
3170                         MyPgXact->delayChkpt = delayChkpt = true;
3171                         lsn = XLogSaveBufferForHint(buffer, buffer_std);
3172                 }
3173
3174                 LockBufHdr(bufHdr);
3175                 Assert(bufHdr->refcount > 0);
3176                 if (!(bufHdr->flags & BM_DIRTY))
3177                 {
3178                         dirtied = true;         /* Means "will be dirtied by this action" */
3179
3180                         /*
3181                          * Set the page LSN if we wrote a backup block. We aren't supposed
3182                          * to set this when only holding a share lock but as long as we
3183                          * serialise it somehow we're OK. We choose to set LSN while
3184                          * holding the buffer header lock, which causes any reader of an
3185                          * LSN who holds only a share lock to also obtain a buffer header
3186                          * lock before using PageGetLSN(), which is enforced in
3187                          * BufferGetLSNAtomic().
3188                          *
3189                          * If checksums are enabled, you might think we should reset the
3190                          * checksum here. That will happen when the page is written
3191                          * sometime later in this checkpoint cycle.
3192                          */
3193                         if (!XLogRecPtrIsInvalid(lsn))
3194                                 PageSetLSN(page, lsn);
3195                 }
3196                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
3197                 UnlockBufHdr(bufHdr);
3198
3199                 if (delayChkpt)
3200                         MyPgXact->delayChkpt = false;
3201
3202                 if (dirtied)
3203                 {
3204                         VacuumPageDirty++;
3205                         pgBufferUsage.shared_blks_dirtied++;
3206                         if (VacuumCostActive)
3207                                 VacuumCostBalance += VacuumCostPageDirty;
3208                 }
3209         }
3210 }
3211
3212 /*
3213  * Release buffer content locks for shared buffers.
3214  *
3215  * Used to clean up after errors.
3216  *
3217  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
3218  * of releasing buffer content locks per se; the only thing we need to deal
3219  * with here is clearing any PIN_COUNT request that was in progress.
3220  */
3221 void
3222 UnlockBuffers(void)
3223 {
3224         BufferDesc *buf = PinCountWaitBuf;
3225
3226         if (buf)
3227         {
3228                 LockBufHdr(buf);
3229
3230                 /*
3231                  * Don't complain if flag bit not set; it could have been reset but we
3232                  * got a cancel/die interrupt before getting the signal.
3233                  */
3234                 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
3235                         buf->wait_backend_pid == MyProcPid)
3236                         buf->flags &= ~BM_PIN_COUNT_WAITER;
3237
3238                 UnlockBufHdr(buf);
3239
3240                 PinCountWaitBuf = NULL;
3241         }
3242 }
3243
3244 /*
3245  * Acquire or release the content_lock for the buffer.
3246  */
3247 void
3248 LockBuffer(Buffer buffer, int mode)
3249 {
3250         BufferDesc *buf;
3251
3252         Assert(BufferIsValid(buffer));
3253         if (BufferIsLocal(buffer))
3254                 return;                                 /* local buffers need no lock */
3255
3256         buf = GetBufferDescriptor(buffer - 1);
3257
3258         if (mode == BUFFER_LOCK_UNLOCK)
3259                 LWLockRelease(BufferDescriptorGetContentLock(buf));
3260         else if (mode == BUFFER_LOCK_SHARE)
3261                 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
3262         else if (mode == BUFFER_LOCK_EXCLUSIVE)
3263                 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
3264         else
3265                 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
3266 }
3267
3268 /*
3269  * Acquire the content_lock for the buffer, but only if we don't have to wait.
3270  *
3271  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
3272  */
3273 bool
3274 ConditionalLockBuffer(Buffer buffer)
3275 {
3276         BufferDesc *buf;
3277
3278         Assert(BufferIsValid(buffer));
3279         if (BufferIsLocal(buffer))
3280                 return true;                    /* act as though we got it */
3281
3282         buf = GetBufferDescriptor(buffer - 1);
3283
3284         return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
3285                                                                         LW_EXCLUSIVE);
3286 }
3287
3288 /*
3289  * LockBufferForCleanup - lock a buffer in preparation for deleting items
3290  *
3291  * Items may be deleted from a disk page only when the caller (a) holds an
3292  * exclusive lock on the buffer and (b) has observed that no other backend
3293  * holds a pin on the buffer.  If there is a pin, then the other backend
3294  * might have a pointer into the buffer (for example, a heapscan reference
3295  * to an item --- see README for more details).  It's OK if a pin is added
3296  * after the cleanup starts, however; the newly-arrived backend will be
3297  * unable to look at the page until we release the exclusive lock.
3298  *
3299  * To implement this protocol, a would-be deleter must pin the buffer and
3300  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
3301  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
3302  * it has successfully observed pin count = 1.
3303  */
3304 void
3305 LockBufferForCleanup(Buffer buffer)
3306 {
3307         BufferDesc *bufHdr;
3308
3309         Assert(BufferIsValid(buffer));
3310         Assert(PinCountWaitBuf == NULL);
3311
3312         if (BufferIsLocal(buffer))
3313         {
3314                 /* There should be exactly one pin */
3315                 if (LocalRefCount[-buffer - 1] != 1)
3316                         elog(ERROR, "incorrect local pin count: %d",
3317                                  LocalRefCount[-buffer - 1]);
3318                 /* Nobody else to wait for */
3319                 return;
3320         }
3321
3322         /* There should be exactly one local pin */
3323         if (GetPrivateRefCount(buffer) != 1)
3324                 elog(ERROR, "incorrect local pin count: %d",
3325                          GetPrivateRefCount(buffer));
3326
3327         bufHdr = GetBufferDescriptor(buffer - 1);
3328
3329         for (;;)
3330         {
3331                 /* Try to acquire lock */
3332                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3333                 LockBufHdr(bufHdr);
3334                 Assert(bufHdr->refcount > 0);
3335                 if (bufHdr->refcount == 1)
3336                 {
3337                         /* Successfully acquired exclusive lock with pincount 1 */
3338                         UnlockBufHdr(bufHdr);
3339                         return;
3340                 }
3341                 /* Failed, so mark myself as waiting for pincount 1 */
3342                 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
3343                 {
3344                         UnlockBufHdr(bufHdr);
3345                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3346                         elog(ERROR, "multiple backends attempting to wait for pincount 1");
3347                 }
3348                 bufHdr->wait_backend_pid = MyProcPid;
3349                 bufHdr->flags |= BM_PIN_COUNT_WAITER;
3350                 PinCountWaitBuf = bufHdr;
3351                 UnlockBufHdr(bufHdr);
3352                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3353
3354                 /* Wait to be signaled by UnpinBuffer() */
3355                 if (InHotStandby)
3356                 {
3357                         /* Publish the bufid that Startup process waits on */
3358                         SetStartupBufferPinWaitBufId(buffer - 1);
3359                         /* Set alarm and then wait to be signaled by UnpinBuffer() */
3360                         ResolveRecoveryConflictWithBufferPin();
3361                         /* Reset the published bufid */
3362                         SetStartupBufferPinWaitBufId(-1);
3363                 }
3364                 else
3365                         ProcWaitForSignal();
3366
3367                 /*
3368                  * Remove flag marking us as waiter. Normally this will not be set
3369                  * anymore, but ProcWaitForSignal() can return for other signals as
3370                  * well.  We take care to only reset the flag if we're the waiter, as
3371                  * theoretically another backend could have started waiting. That's
3372                  * impossible with the current usages due to table level locking, but
3373                  * better be safe.
3374                  */
3375                 LockBufHdr(bufHdr);
3376                 if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 &&
3377                         bufHdr->wait_backend_pid == MyProcPid)
3378                         bufHdr->flags &= ~BM_PIN_COUNT_WAITER;
3379                 UnlockBufHdr(bufHdr);
3380
3381                 PinCountWaitBuf = NULL;
3382                 /* Loop back and try again */
3383         }
3384 }
3385
3386 /*
3387  * Check called from RecoveryConflictInterrupt handler when Startup
3388  * process requests cancellation of all pin holders that are blocking it.
3389  */
3390 bool
3391 HoldingBufferPinThatDelaysRecovery(void)
3392 {
3393         int                     bufid = GetStartupBufferPinWaitBufId();
3394
3395         /*
3396          * If we get woken slowly then it's possible that the Startup process was
3397          * already woken by other backends before we got here. Also possible that
3398          * we get here by multiple interrupts or interrupts at inappropriate
3399          * times, so make sure we do nothing if the bufid is not set.
3400          */
3401         if (bufid < 0)
3402                 return false;
3403
3404         if (GetPrivateRefCount(bufid + 1) > 0)
3405                 return true;
3406
3407         return false;
3408 }
3409
3410 /*
3411  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
3412  *
3413  * We won't loop, but just check once to see if the pin count is OK.  If
3414  * not, return FALSE with no lock held.
3415  */
3416 bool
3417 ConditionalLockBufferForCleanup(Buffer buffer)
3418 {
3419         BufferDesc *bufHdr;
3420
3421         Assert(BufferIsValid(buffer));
3422
3423         if (BufferIsLocal(buffer))
3424         {
3425                 /* There should be exactly one pin */
3426                 Assert(LocalRefCount[-buffer - 1] > 0);
3427                 if (LocalRefCount[-buffer - 1] != 1)
3428                         return false;
3429                 /* Nobody else to wait for */
3430                 return true;
3431         }
3432
3433         /* There should be exactly one local pin */
3434         Assert(GetPrivateRefCount(buffer) > 0);
3435         if (GetPrivateRefCount(buffer) != 1)
3436                 return false;
3437
3438         /* Try to acquire lock */
3439         if (!ConditionalLockBuffer(buffer))
3440                 return false;
3441
3442         bufHdr = GetBufferDescriptor(buffer - 1);
3443         LockBufHdr(bufHdr);
3444         Assert(bufHdr->refcount > 0);
3445         if (bufHdr->refcount == 1)
3446         {
3447                 /* Successfully acquired exclusive lock with pincount 1 */
3448                 UnlockBufHdr(bufHdr);
3449                 return true;
3450         }
3451
3452         /* Failed, so release the lock */
3453         UnlockBufHdr(bufHdr);
3454         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3455         return false;
3456 }
3457
3458
3459 /*
3460  *      Functions for buffer I/O handling
3461  *
3462  *      Note: We assume that nested buffer I/O never occurs.
3463  *      i.e at most one io_in_progress lock is held per proc.
3464  *
3465  *      Also note that these are used only for shared buffers, not local ones.
3466  */
3467
3468 /*
3469  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
3470  */
3471 static void
3472 WaitIO(BufferDesc *buf)
3473 {
3474         /*
3475          * Changed to wait until there's no IO - Inoue 01/13/2000
3476          *
3477          * Note this is *necessary* because an error abort in the process doing
3478          * I/O could release the io_in_progress_lock prematurely. See
3479          * AbortBufferIO.
3480          */
3481         for (;;)
3482         {
3483                 BufFlags        sv_flags;
3484
3485                 /*
3486                  * It may not be necessary to acquire the spinlock to check the flag
3487                  * here, but since this test is essential for correctness, we'd better
3488                  * play it safe.
3489                  */
3490                 LockBufHdr(buf);
3491                 sv_flags = buf->flags;
3492                 UnlockBufHdr(buf);
3493                 if (!(sv_flags & BM_IO_IN_PROGRESS))
3494                         break;
3495                 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
3496                 LWLockRelease(BufferDescriptorGetIOLock(buf));
3497         }
3498 }
3499
3500 /*
3501  * StartBufferIO: begin I/O on this buffer
3502  *      (Assumptions)
3503  *      My process is executing no IO
3504  *      The buffer is Pinned
3505  *
3506  * In some scenarios there are race conditions in which multiple backends
3507  * could attempt the same I/O operation concurrently.  If someone else
3508  * has already started I/O on this buffer then we will block on the
3509  * io_in_progress lock until he's done.
3510  *
3511  * Input operations are only attempted on buffers that are not BM_VALID,
3512  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
3513  * so we can always tell if the work is already done.
3514  *
3515  * Returns TRUE if we successfully marked the buffer as I/O busy,
3516  * FALSE if someone else already did the work.
3517  */
3518 static bool
3519 StartBufferIO(BufferDesc *buf, bool forInput)
3520 {
3521         Assert(!InProgressBuf);
3522
3523         for (;;)
3524         {
3525                 /*
3526                  * Grab the io_in_progress lock so that other processes can wait for
3527                  * me to finish the I/O.
3528                  */
3529                 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3530
3531                 LockBufHdr(buf);
3532
3533                 if (!(buf->flags & BM_IO_IN_PROGRESS))
3534                         break;
3535
3536                 /*
3537                  * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3538                  * lock isn't held is if the process doing the I/O is recovering from
3539                  * an error (see AbortBufferIO).  If that's the case, we must wait for
3540                  * him to get unwedged.
3541                  */
3542                 UnlockBufHdr(buf);
3543                 LWLockRelease(BufferDescriptorGetIOLock(buf));
3544                 WaitIO(buf);
3545         }
3546
3547         /* Once we get here, there is definitely no I/O active on this buffer */
3548
3549         if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
3550         {
3551                 /* someone else already did the I/O */
3552                 UnlockBufHdr(buf);
3553                 LWLockRelease(BufferDescriptorGetIOLock(buf));
3554                 return false;
3555         }
3556
3557         buf->flags |= BM_IO_IN_PROGRESS;
3558
3559         UnlockBufHdr(buf);
3560
3561         InProgressBuf = buf;
3562         IsForInput = forInput;
3563
3564         return true;
3565 }
3566
3567 /*
3568  * TerminateBufferIO: release a buffer we were doing I/O on
3569  *      (Assumptions)
3570  *      My process is executing IO for the buffer
3571  *      BM_IO_IN_PROGRESS bit is set for the buffer
3572  *      We hold the buffer's io_in_progress lock
3573  *      The buffer is Pinned
3574  *
3575  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3576  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
3577  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
3578  * marking the buffer clean if it was re-dirtied while we were writing.
3579  *
3580  * set_flag_bits gets ORed into the buffer's flags.  It must include
3581  * BM_IO_ERROR in a failure case.  For successful completion it could
3582  * be 0, or BM_VALID if we just finished reading in the page.
3583  */
3584 static void
3585 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
3586 {
3587         Assert(buf == InProgressBuf);
3588
3589         LockBufHdr(buf);
3590
3591         Assert(buf->flags & BM_IO_IN_PROGRESS);
3592         buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3593         if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
3594                 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3595         buf->flags |= set_flag_bits;
3596
3597         UnlockBufHdr(buf);
3598
3599         InProgressBuf = NULL;
3600
3601         LWLockRelease(BufferDescriptorGetIOLock(buf));
3602 }
3603
3604 /*
3605  * AbortBufferIO: Clean up any active buffer I/O after an error.
3606  *
3607  *      All LWLocks we might have held have been released,
3608  *      but we haven't yet released buffer pins, so the buffer is still pinned.
3609  *
3610  *      If I/O was in progress, we always set BM_IO_ERROR, even though it's
3611  *      possible the error condition wasn't related to the I/O.
3612  */
3613 void
3614 AbortBufferIO(void)
3615 {
3616         BufferDesc *buf = InProgressBuf;
3617
3618         if (buf)
3619         {
3620                 /*
3621                  * Since LWLockReleaseAll has already been called, we're not holding
3622                  * the buffer's io_in_progress_lock. We have to re-acquire it so that
3623                  * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3624                  * buffer will be in a busy spin until we succeed in doing this.
3625                  */
3626                 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3627
3628                 LockBufHdr(buf);
3629                 Assert(buf->flags & BM_IO_IN_PROGRESS);
3630                 if (IsForInput)
3631                 {
3632                         Assert(!(buf->flags & BM_DIRTY));
3633                         /* We'd better not think buffer is valid yet */
3634                         Assert(!(buf->flags & BM_VALID));
3635                         UnlockBufHdr(buf);
3636                 }
3637                 else
3638                 {
3639                         BufFlags        sv_flags;
3640
3641                         sv_flags = buf->flags;
3642                         Assert(sv_flags & BM_DIRTY);
3643                         UnlockBufHdr(buf);
3644                         /* Issue notice if this is not the first failure... */
3645                         if (sv_flags & BM_IO_ERROR)
3646                         {
3647                                 /* Buffer is pinned, so we can read tag without spinlock */
3648                                 char       *path;
3649
3650                                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
3651                                 ereport(WARNING,
3652                                                 (errcode(ERRCODE_IO_ERROR),
3653                                                  errmsg("could not write block %u of %s",
3654                                                                 buf->tag.blockNum, path),
3655                                                  errdetail("Multiple failures --- write error might be permanent.")));
3656                                 pfree(path);
3657                         }
3658                 }
3659                 TerminateBufferIO(buf, false, BM_IO_ERROR);
3660         }
3661 }
3662
3663 /*
3664  * Error context callback for errors occurring during shared buffer writes.
3665  */
3666 static void
3667 shared_buffer_write_error_callback(void *arg)
3668 {
3669         BufferDesc *bufHdr = (BufferDesc *) arg;
3670
3671         /* Buffer is pinned, so we can read the tag without locking the spinlock */
3672         if (bufHdr != NULL)
3673         {
3674                 char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
3675
3676                 errcontext("writing block %u of relation %s",
3677                                    bufHdr->tag.blockNum, path);
3678                 pfree(path);
3679         }
3680 }
3681
3682 /*
3683  * Error context callback for errors occurring during local buffer writes.
3684  */
3685 static void
3686 local_buffer_write_error_callback(void *arg)
3687 {
3688         BufferDesc *bufHdr = (BufferDesc *) arg;
3689
3690         if (bufHdr != NULL)
3691         {
3692                 char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
3693                                                                                   bufHdr->tag.forkNum);
3694
3695                 errcontext("writing block %u of relation %s",
3696                                    bufHdr->tag.blockNum, path);
3697                 pfree(path);
3698         }
3699 }
3700
3701 /*
3702  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
3703  */
3704 static int
3705 rnode_comparator(const void *p1, const void *p2)
3706 {
3707         RelFileNode n1 = *(RelFileNode *) p1;
3708         RelFileNode n2 = *(RelFileNode *) p2;
3709
3710         if (n1.relNode < n2.relNode)
3711                 return -1;
3712         else if (n1.relNode > n2.relNode)
3713                 return 1;
3714
3715         if (n1.dbNode < n2.dbNode)
3716                 return -1;
3717         else if (n1.dbNode > n2.dbNode)
3718                 return 1;
3719
3720         if (n1.spcNode < n2.spcNode)
3721                 return -1;
3722         else if (n1.spcNode > n2.spcNode)
3723                 return 1;
3724         else
3725                 return 0;
3726 }