]> granicus.if.org Git - postgresql/blob - src/backend/storage/buffer/bufmgr.c
566627d883b284498c7a44bde147b30b4c65e26a
[postgresql] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c--
4  *        buffer manager interface routines
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.31 1998/01/07 21:04:49 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /*
15  *
16  * BufferAlloc() -- lookup a buffer in the buffer table.  If
17  *              it isn't there add it, but do not read it into memory.
18  *              This is used when we are about to reinitialize the
19  *              buffer so don't care what the current disk contents are.
20  *              BufferAlloc() pins the new buffer in memory.
21  *
22  * ReadBuffer() -- same as BufferAlloc() but reads the data
23  *              on a buffer cache miss.
24  *
25  * ReleaseBuffer() -- unpin the buffer
26  *
27  * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
28  *              but don't unpin.  The disk IO is delayed until buffer
29  *              replacement if WriteMode is BUFFER_LATE_WRITE.
30  *
31  * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
32  *
33  * FlushBuffer() -- as above but never delayed write.
34  *
35  * BufferSync() -- flush all dirty buffers in the buffer pool.
36  *
37  * InitBufferPool() -- Init the buffer module.
38  *
39  * See other files:
40  *              freelist.c -- chooses victim for buffer replacement
41  *              buf_table.c -- manages the buffer lookup table
42  */
43 #include <sys/types.h>
44 #include <sys/file.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <math.h>
48 #include <signal.h>
49
50 #include "postgres.h"
51
52 /* declarations split between these three files */
53 #include "storage/buf.h"
54 #include "storage/buf_internals.h"
55 #include "storage/bufmgr.h"
56
57 #include "storage/fd.h"
58 #include "storage/ipc.h"
59 #include "storage/s_lock.h"
60 #include "storage/shmem.h"
61 #include "storage/spin.h"
62 #include "storage/smgr.h"
63 #include "storage/lmgr.h"
64 #include "miscadmin.h"
65 #include "utils/builtins.h"
66 #include "utils/hsearch.h"
67 #include "utils/palloc.h"
68 #include "utils/memutils.h"
69 #include "utils/relcache.h"
70 #include "executor/execdebug.h" /* for NDirectFileRead */
71 #include "catalog/catalog.h"
72
73 extern SPINLOCK BufMgrLock;
74 extern long int ReadBufferCount;
75 extern long int ReadLocalBufferCount;
76 extern long int BufferHitCount;
77 extern long int LocalBufferHitCount;
78 extern long int BufferFlushCount;
79 extern long int LocalBufferFlushCount;
80
81 static int      WriteMode = BUFFER_LATE_WRITE;          /* Delayed write is
82                                                                                                  * default */
83
84 static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
85
86 #ifndef HAS_TEST_AND_SET
87 static void SignalIO(BufferDesc *buf);
88 extern long *NWaitIOBackendP;   /* defined in buf_init.c */
89
90 #endif                                                  /* HAS_TEST_AND_SET */
91
92 static Buffer
93 ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
94                                                  bool bufferLockHeld);
95 static BufferDesc *
96 BufferAlloc(Relation reln, BlockNumber blockNum,
97                         bool *foundPtr, bool bufferLockHeld);
98 static int      FlushBuffer(Buffer buffer, bool release);
99 static void BufferSync(void);
100 static int      BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld);
101
102 /* not static but used by vacuum only ... */
103 int BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
104
105 /* ---------------------------------------------------
106  * RelationGetBufferWithBuffer
107  *              see if the given buffer is what we want
108  *              if yes, we don't need to bother the buffer manager
109  * ---------------------------------------------------
110  */
111 Buffer
112 RelationGetBufferWithBuffer(Relation relation,
113                                                         BlockNumber blockNumber,
114                                                         Buffer buffer)
115 {
116         BufferDesc *bufHdr;
117         LRelId          lrelId;
118
119         if (BufferIsValid(buffer))
120         {
121                 if (!BufferIsLocal(buffer))
122                 {
123                         bufHdr = &BufferDescriptors[buffer - 1];
124                         lrelId = RelationGetLRelId(relation);
125                         SpinAcquire(BufMgrLock);
126                         if (bufHdr->tag.blockNum == blockNumber &&
127                                 bufHdr->tag.relId.relId == lrelId.relId &&
128                                 bufHdr->tag.relId.dbId == lrelId.dbId)
129                         {
130                                 SpinRelease(BufMgrLock);
131                                 return (buffer);
132                         }
133                         return (ReadBufferWithBufferLock(relation, blockNumber, true));
134                 }
135                 else
136                 {
137                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
138                         if (bufHdr->tag.relId.relId == relation->rd_id &&
139                                 bufHdr->tag.blockNum == blockNumber)
140                         {
141                                 return (buffer);
142                         }
143                 }
144         }
145         return (ReadBuffer(relation, blockNumber));
146 }
147
148 /*
149  * ReadBuffer -- returns a buffer containing the requested
150  *              block of the requested relation.  If the blknum
151  *              requested is P_NEW, extend the relation file and
152  *              allocate a new block.
153  *
154  * Returns: the buffer number for the buffer containing
155  *              the block read or NULL on an error.
156  *
157  * Assume when this function is called, that reln has been
158  *              opened already.
159  */
160
161 extern int      ShowPinTrace;
162
163
164 #undef ReadBuffer                               /* conflicts with macro when BUFMGR_DEBUG
165                                                                  * defined */
166
167 /*
168  * ReadBuffer --
169  *
170  */
171 Buffer
172 ReadBuffer(Relation reln, BlockNumber blockNum)
173 {
174         return ReadBufferWithBufferLock(reln, blockNum, false);
175 }
176
177 /*
178  * is_userbuffer
179  *
180  * XXX caller must have already acquired BufMgrLock
181  */
182 #ifdef NOT_USED
183 static bool
184 is_userbuffer(Buffer buffer)
185 {
186         BufferDesc *buf = &BufferDescriptors[buffer - 1];
187
188         if (IsSystemRelationName(buf->sb_relname))
189                 return false;
190         return true;
191 }
192
193 #endif
194
195 #ifdef NOT_USED
196 Buffer
197 ReadBuffer_Debug(char *file,
198                                  int line,
199                                  Relation reln,
200                                  BlockNumber blockNum)
201 {
202         Buffer          buffer;
203
204         buffer = ReadBufferWithBufferLock(reln, blockNum, false);
205         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
206         {
207                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
208
209                 fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \
210 refcount = %ld, file: %s, line: %d\n",
211                                 buffer, buf->sb_relname, buf->tag.blockNum,
212                                 PrivateRefCount[buffer - 1], file, line);
213         }
214         return buffer;
215 }
216
217 #endif
218
219 /*
220  * ReadBufferWithBufferLock -- does the work of
221  *              ReadBuffer() but with the possibility that
222  *              the buffer lock has already been held. this
223  *              is yet another effort to reduce the number of
224  *              semops in the system.
225  */
226 static Buffer
227 ReadBufferWithBufferLock(Relation reln,
228                                                  BlockNumber blockNum,
229                                                  bool bufferLockHeld)
230 {
231         BufferDesc *bufHdr;
232         int                     extend;                 /* extending the file by one block */
233         int                     status;
234         bool            found;
235         bool            isLocalBuf;
236
237         extend = (blockNum == P_NEW);
238         isLocalBuf = reln->rd_islocal;
239
240         if (isLocalBuf)
241         {
242                 ReadLocalBufferCount++;
243                 bufHdr = LocalBufferAlloc(reln, blockNum, &found);
244                 if (found)
245                         LocalBufferHitCount++;
246         }
247         else
248         {
249                 ReadBufferCount++;
250
251                 /*
252                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested
253                  * block is not currently in memory.
254                  */
255                 bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
256                 if (found)
257                         BufferHitCount++;
258         }
259
260         if (!bufHdr)
261         {
262                 return (InvalidBuffer);
263         }
264
265         /* if its already in the buffer pool, we're done */
266         if (found)
267         {
268
269                 /*
270                  * This happens when a bogus buffer was returned previously and is
271                  * floating around in the buffer pool.  A routine calling this
272                  * would want this extended.
273                  */
274                 if (extend)
275                 {
276                         /* new buffers are zero-filled */
277                         MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
278                         smgrextend(DEFAULT_SMGR, reln,
279                                            (char *) MAKE_PTR(bufHdr->data));
280                 }
281                 return (BufferDescriptorGetBuffer(bufHdr));
282
283         }
284
285         /*
286          * if we have gotten to this point, the reln pointer must be ok and
287          * the relation file must be open.
288          */
289         if (extend)
290         {
291                 /* new buffers are zero-filled */
292                 MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
293                 status = smgrextend(DEFAULT_SMGR, reln,
294                                                         (char *) MAKE_PTR(bufHdr->data));
295         }
296         else
297         {
298                 status = smgrread(DEFAULT_SMGR, reln, blockNum,
299                                                   (char *) MAKE_PTR(bufHdr->data));
300         }
301
302         if (isLocalBuf)
303                 return (BufferDescriptorGetBuffer(bufHdr));
304
305         /* lock buffer manager again to update IO IN PROGRESS */
306         SpinAcquire(BufMgrLock);
307
308         if (status == SM_FAIL)
309         {
310                 /* IO Failed.  cleanup the data structures and go home */
311
312                 if (!BufTableDelete(bufHdr))
313                 {
314                         SpinRelease(BufMgrLock);
315                         elog(FATAL, "BufRead: buffer table broken after IO error\n");
316                 }
317                 /* remember that BufferAlloc() pinned the buffer */
318                 UnpinBuffer(bufHdr);
319
320                 /*
321                  * Have to reset the flag so that anyone waiting for the buffer
322                  * can tell that the contents are invalid.
323                  */
324                 bufHdr->flags |= BM_IO_ERROR;
325                 bufHdr->flags &= ~BM_IO_IN_PROGRESS;
326         }
327         else
328         {
329                 /* IO Succeeded.  clear the flags, finish buffer update */
330
331                 bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
332         }
333
334         /* If anyone was waiting for IO to complete, wake them up now */
335 #ifdef HAS_TEST_AND_SET
336         S_UNLOCK(&(bufHdr->io_in_progress_lock));
337 #else
338         if (bufHdr->refcount > 1)
339                 SignalIO(bufHdr);
340 #endif
341
342         SpinRelease(BufMgrLock);
343
344         if (status == SM_FAIL)
345                 return (InvalidBuffer);
346
347         return (BufferDescriptorGetBuffer(bufHdr));
348 }
349
350 /*
351  * BufferAlloc -- Get a buffer from the buffer pool but dont
352  *              read it.
353  *
354  * Returns: descriptor for buffer
355  *
356  * When this routine returns, the BufMgrLock is guaranteed NOT be held.
357  */
358 static BufferDesc *
359 BufferAlloc(Relation reln,
360                         BlockNumber blockNum,
361                         bool *foundPtr,
362                         bool bufferLockHeld)
363 {
364         BufferDesc *buf,
365                            *buf2;
366         BufferTag       newTag;                 /* identity of requested block */
367         bool            inProgress;             /* buffer undergoing IO */
368         bool            newblock = FALSE;
369
370         /* create a new tag so we can lookup the buffer */
371         /* assume that the relation is already open */
372         if (blockNum == P_NEW)
373         {
374                 newblock = TRUE;
375                 blockNum = smgrnblocks(DEFAULT_SMGR, reln);
376         }
377
378         INIT_BUFFERTAG(&newTag, reln, blockNum);
379
380         if (!bufferLockHeld)
381                 SpinAcquire(BufMgrLock);
382
383         /* see if the block is in the buffer pool already */
384         buf = BufTableLookup(&newTag);
385         if (buf != NULL)
386         {
387
388                 /*
389                  * Found it.  Now, (a) pin the buffer so no one steals it from the
390                  * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
391                  * the buffer into the buffer pool.
392                  */
393
394                 PinBuffer(buf);
395                 inProgress = (buf->flags & BM_IO_IN_PROGRESS);
396
397                 *foundPtr = TRUE;
398                 if (inProgress)
399                 {
400                         WaitIO(buf, BufMgrLock);
401                         if (buf->flags & BM_IO_ERROR)
402                         {
403
404                                 /*
405                                  * wierd race condition:
406                                  *
407                                  * We were waiting for someone else to read the buffer. While
408                                  * we were waiting, the reader boof'd in some way, so the
409                                  * contents of the buffer are still invalid.  By saying
410                                  * that we didn't find it, we can make the caller
411                                  * reinitialize the buffer.  If two processes are waiting
412                                  * for this block, both will read the block.  The second
413                                  * one to finish may overwrite any updates made by the
414                                  * first.  (Assume higher level synchronization prevents
415                                  * this from happening).
416                                  *
417                                  * This is never going to happen, don't worry about it.
418                                  */
419                                 *foundPtr = FALSE;
420                         }
421                 }
422 #ifdef BMTRACE
423                 _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), reln->rd_id, blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
424 #endif                                                  /* BMTRACE */
425
426                 SpinRelease(BufMgrLock);
427
428                 return (buf);
429         }
430
431         *foundPtr = FALSE;
432
433         /*
434          * Didn't find it in the buffer pool.  We'll have to initialize a new
435          * buffer.      First, grab one from the free list.  If it's dirty, flush
436          * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
437          */
438         inProgress = FALSE;
439         for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
440         {
441
442                 /* GetFreeBuffer will abort if it can't find a free buffer */
443                 buf = GetFreeBuffer();
444
445                 /*
446                  * But it can return buf == NULL if we are in aborting transaction
447                  * now and so elog(ERROR,...) in GetFreeBuffer will not abort
448                  * again.
449                  */
450                 if (buf == NULL)
451                         return (NULL);
452
453                 /*
454                  * There should be exactly one pin on the buffer after it is
455                  * allocated -- ours.  If it had a pin it wouldn't have been on
456                  * the free list.  No one else could have pinned it between
457                  * GetFreeBuffer and here because we have the BufMgrLock.
458                  */
459                 Assert(buf->refcount == 0);
460                 buf->refcount = 1;
461                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
462
463                 if (buf->flags & BM_DIRTY)
464                 {
465                         bool            smok;
466
467                         /*
468                          * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
469                          * with the contents of the buffer while we write it out. We
470                          * don't really care if they try to read it, but if they can
471                          * complete a BufferAlloc on it they can then scribble into
472                          * it, and we'd really like to avoid that while we are
473                          * flushing the buffer.  Setting this flag should block them
474                          * in WaitIO until we're done.
475                          */
476                         inProgress = TRUE;
477                         buf->flags |= BM_IO_IN_PROGRESS;
478 #ifdef HAS_TEST_AND_SET
479
480                         /*
481                          * All code paths that acquire this lock pin the buffer first;
482                          * since no one had it pinned (it just came off the free
483                          * list), no one else can have this lock.
484                          */
485                         Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
486                         S_LOCK(&(buf->io_in_progress_lock));
487 #endif                                                  /* HAS_TEST_AND_SET */
488
489                         /*
490                          * Write the buffer out, being careful to release BufMgrLock
491                          * before starting the I/O.
492                          *
493                          * This #ifndef is here because a few extra semops REALLY kill
494                          * you on machines that don't have spinlocks.  If you don't
495                          * operate with much concurrency, well...
496                          */
497                         smok = BufferReplace(buf, true);
498 #ifndef OPTIMIZE_SINGLE
499                         SpinAcquire(BufMgrLock);
500 #endif                                                  /* OPTIMIZE_SINGLE */
501
502                         if (smok == FALSE)
503                         {
504                                 elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
505                                          buf->tag.blockNum, buf->sb_dbname, buf->sb_relname);
506                                 inProgress = FALSE;
507                                 buf->flags |= BM_IO_ERROR;
508                                 buf->flags &= ~BM_IO_IN_PROGRESS;
509 #ifdef HAS_TEST_AND_SET
510                                 S_UNLOCK(&(buf->io_in_progress_lock));
511 #else                                                   /* !HAS_TEST_AND_SET */
512                                 if (buf->refcount > 1)
513                                         SignalIO(buf);
514 #endif                                                  /* !HAS_TEST_AND_SET */
515                                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
516                                 buf->refcount--;
517                                 if (buf->refcount == 0)
518                                 {
519                                         AddBufferToFreelist(buf);
520                                         buf->flags |= BM_FREE;
521                                 }
522                                 buf = (BufferDesc *) NULL;
523                         }
524                         else
525                         {
526
527                                 /*
528                                  * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
529                                  * be setted by anyone.         - vadim 01/17/97
530                                  */
531                                 if (buf->flags & BM_JUST_DIRTIED)
532                                 {
533                                         elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing",
534                                                  buf->tag.blockNum, buf->sb_relname);
535                                 }
536                                 else
537                                 {
538                                         buf->flags &= ~BM_DIRTY;
539                                 }
540                         }
541
542                         /*
543                          * Somebody could have pinned the buffer while we were doing
544                          * the I/O and had given up the BufMgrLock (though they would
545                          * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
546                          * That's why this is a loop -- if so, we need to clear the
547                          * I/O flags, remove our pin and start all over again.
548                          *
549                          * People may be making buffers free at any time, so there's no
550                          * reason to think that we have an immediate disaster on our
551                          * hands.
552                          */
553                         if (buf && buf->refcount > 1)
554                         {
555                                 inProgress = FALSE;
556                                 buf->flags &= ~BM_IO_IN_PROGRESS;
557 #ifdef HAS_TEST_AND_SET
558                                 S_UNLOCK(&(buf->io_in_progress_lock));
559 #else                                                   /* !HAS_TEST_AND_SET */
560                                 if (buf->refcount > 1)
561                                         SignalIO(buf);
562 #endif                                                  /* !HAS_TEST_AND_SET */
563                                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
564                                 buf->refcount--;
565                                 buf = (BufferDesc *) NULL;
566                         }
567
568                         /*
569                          * Somebody could have allocated another buffer for the same
570                          * block we are about to read in. (While we flush out the
571                          * dirty buffer, we don't hold the lock and someone could have
572                          * allocated another buffer for the same block. The problem is
573                          * we haven't gotten around to insert the new tag into the
574                          * buffer table. So we need to check here.              -ay 3/95
575                          */
576                         buf2 = BufTableLookup(&newTag);
577                         if (buf2 != NULL)
578                         {
579
580                                 /*
581                                  * Found it. Someone has already done what we're about to
582                                  * do. We'll just handle this as if it were found in the
583                                  * buffer pool in the first place.
584                                  */
585                                 if (buf != NULL)
586                                 {
587 #ifdef HAS_TEST_AND_SET
588                                         S_UNLOCK(&(buf->io_in_progress_lock));
589 #else                                                   /* !HAS_TEST_AND_SET */
590                                         if (buf->refcount > 1)
591                                                 SignalIO(buf);
592 #endif                                                  /* !HAS_TEST_AND_SET */
593
594                                         /* give up the buffer since we don't need it any more */
595                                         buf->refcount--;
596                                         PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
597                                         AddBufferToFreelist(buf);
598                                         buf->flags |= BM_FREE;
599                                         buf->flags &= ~BM_IO_IN_PROGRESS;
600                                 }
601
602                                 PinBuffer(buf2);
603                                 inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
604
605                                 *foundPtr = TRUE;
606                                 if (inProgress)
607                                 {
608                                         WaitIO(buf2, BufMgrLock);
609                                         if (buf2->flags & BM_IO_ERROR)
610                                         {
611                                                 *foundPtr = FALSE;
612                                         }
613                                 }
614
615                                 SpinRelease(BufMgrLock);
616
617                                 return (buf2);
618                         }
619                 }
620         }
621
622         /*
623          * At this point we should have the sole pin on a non-dirty buffer and
624          * we may or may not already have the BM_IO_IN_PROGRESS flag set.
625          */
626
627         /*
628          * Change the name of the buffer in the lookup table:
629          *
630          * Need to update the lookup table before the read starts. If someone
631          * comes along looking for the buffer while we are reading it in, we
632          * don't want them to allocate a new buffer.  For the same reason, we
633          * didn't want to erase the buf table entry for the buffer we were
634          * writing back until now, either.
635          */
636
637         if (!BufTableDelete(buf))
638         {
639                 SpinRelease(BufMgrLock);
640                 elog(FATAL, "buffer wasn't in the buffer table\n");
641
642         }
643
644         /* record the database name and relation name for this buffer */
645         strcpy(buf->sb_relname, reln->rd_rel->relname.data);
646         strcpy(buf->sb_dbname, GetDatabaseName());
647
648         INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
649         if (!BufTableInsert(buf))
650         {
651                 SpinRelease(BufMgrLock);
652                 elog(FATAL, "Buffer in lookup table twice \n");
653         }
654
655         /*
656          * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
657          * so no one fiddles with them until the read completes.  If this
658          * routine has been called simply to allocate a buffer, no io will be
659          * attempted, so the flag isnt set.
660          */
661         if (!inProgress)
662         {
663                 buf->flags |= BM_IO_IN_PROGRESS;
664 #ifdef HAS_TEST_AND_SET
665                 Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
666                 S_LOCK(&(buf->io_in_progress_lock));
667 #endif                                                  /* HAS_TEST_AND_SET */
668         }
669
670 #ifdef BMTRACE
671         _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), reln->rd_id, blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
672 #endif                                                  /* BMTRACE */
673
674         SpinRelease(BufMgrLock);
675
676         return (buf);
677 }
678
679 /*
680  * WriteBuffer--
681  *
682  *              Pushes buffer contents to disk if WriteMode is BUFFER_FLUSH_WRITE.
683  *              Otherwise, marks contents as dirty.
684  *
685  * Assume that buffer is pinned.  Assume that reln is
686  *              valid.
687  *
688  * Side Effects:
689  *              Pin count is decremented.
690  */
691
692 #undef WriteBuffer
693
694 int
695 WriteBuffer(Buffer buffer)
696 {
697         BufferDesc *bufHdr;
698
699         if (WriteMode == BUFFER_FLUSH_WRITE)
700         {
701                 return (FlushBuffer(buffer, TRUE));
702         }
703         else
704         {
705
706                 if (BufferIsLocal(buffer))
707                         return WriteLocalBuffer(buffer, TRUE);
708
709                 if (BAD_BUFFER_ID(buffer))
710                         return (FALSE);
711
712                 bufHdr = &BufferDescriptors[buffer - 1];
713
714                 SpinAcquire(BufMgrLock);
715                 Assert(bufHdr->refcount > 0);
716                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
717                 UnpinBuffer(bufHdr);
718                 SpinRelease(BufMgrLock);
719                 CommitInfoNeedsSave[buffer - 1] = 0;
720         }
721         return (TRUE);
722 }
723
724 #ifdef NOT_USED
725 void
726 WriteBuffer_Debug(char *file, int line, Buffer buffer)
727 {
728         WriteBuffer(buffer);
729         if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
730         {
731                 BufferDesc *buf;
732
733                 buf = &BufferDescriptors[buffer - 1];
734                 fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \
735 refcount = %ld, file: %s, line: %d\n",
736                                 buffer, buf->sb_relname, buf->tag.blockNum,
737                                 PrivateRefCount[buffer - 1], file, line);
738         }
739 }
740
741 #endif
742
743 /*
744  * DirtyBufferCopy() -- For a given dbid/relid/blockno, if the buffer is
745  *                                              in the cache and is dirty, mark it clean and copy
746  *                                              it to the requested location.  This is a logical
747  *                                              write, and has been installed to support the cache
748  *                                              management code for write-once storage managers.
749  *
750  *      DirtyBufferCopy() -- Copy a given dirty buffer to the requested
751  *                                               destination.
752  *
753  *              We treat this as a write.  If the requested buffer is in the pool
754  *              and is dirty, we copy it to the location requested and mark it
755  *              clean.  This routine supports the Sony jukebox storage manager,
756  *              which agrees to take responsibility for the data once we mark
757  *              it clean.
758  *
759  *      NOTE: used by sony jukebox code in postgres 4.2   - ay 2/95
760  */
761 #ifdef NOT_USED
762 void
763 DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest)
764 {
765         BufferDesc *buf;
766         BufferTag       btag;
767
768         btag.relId.relId = relid;
769         btag.relId.dbId = dbid;
770         btag.blockNum = blkno;
771
772         SpinAcquire(BufMgrLock);
773         buf = BufTableLookup(&btag);
774
775         if (buf == (BufferDesc *) NULL
776                 || !(buf->flags & BM_DIRTY)
777                 || !(buf->flags & BM_VALID))
778         {
779                 SpinRelease(BufMgrLock);
780                 return;
781         }
782
783         /*
784          * hate to do this holding the lock, but release and reacquire is
785          * slower
786          */
787         memmove(dest, (char *) MAKE_PTR(buf->data), BLCKSZ);
788
789         buf->flags &= ~BM_DIRTY;
790
791         SpinRelease(BufMgrLock);
792 }
793
794 #endif
795
796 /*
797  * FlushBuffer -- like WriteBuffer, but force the page to disk.
798  *
799  * 'buffer' is known to be dirty/pinned, so there should not be a
800  * problem reading the BufferDesc members without the BufMgrLock
801  * (nobody should be able to change tags, flags, etc. out from under
802  * us).
803  */
804 static int
805 FlushBuffer(Buffer buffer, bool release)
806 {
807         BufferDesc *bufHdr;
808         Oid                     bufdb;
809         Relation        bufrel;
810         int                     status;
811
812         if (BufferIsLocal(buffer))
813                 return FlushLocalBuffer(buffer, release);
814
815         if (BAD_BUFFER_ID(buffer))
816                 return (STATUS_ERROR);
817
818         bufHdr = &BufferDescriptors[buffer - 1];
819         bufdb = bufHdr->tag.relId.dbId;
820
821         Assert(bufdb == MyDatabaseId || bufdb == (Oid) NULL);
822         bufrel = RelationIdCacheGetRelation(bufHdr->tag.relId.relId);
823         Assert(bufrel != (Relation) NULL);
824
825         /* To check if block content changed while flushing. - vadim 01/17/97 */
826         SpinAcquire(BufMgrLock);
827         bufHdr->flags &= ~BM_JUST_DIRTIED;
828         SpinRelease(BufMgrLock);
829
830         status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
831                                            (char *) MAKE_PTR(bufHdr->data));
832         
833         RelationDecrementReferenceCount(bufrel);
834
835         if (status == SM_FAIL)
836         {
837                 elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s",
838                          bufHdr->tag.blockNum, bufHdr->sb_relname);
839                 return (STATUS_ERROR);
840         }
841         BufferFlushCount++;
842
843         SpinAcquire(BufMgrLock);
844
845         /*
846          * If this buffer was marked by someone as DIRTY while we were
847          * flushing it out we must not clear DIRTY flag - vadim 01/17/97
848          */
849         if (bufHdr->flags & BM_JUST_DIRTIED)
850         {
851                 elog(NOTICE, "FlusfBuffer: content of block %u (%s) changed while flushing",
852                          bufHdr->tag.blockNum, bufHdr->sb_relname);
853         }
854         else
855         {
856                 bufHdr->flags &= ~BM_DIRTY;
857         }
858         if (release)
859                 UnpinBuffer(bufHdr);
860         SpinRelease(BufMgrLock);
861         CommitInfoNeedsSave[buffer - 1] = 0;
862
863         return (STATUS_OK);
864 }
865
866 /*
867  * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
868  *                                                 when the operation is complete.
869  *
870  *              We know that the buffer is for a relation in our private cache,
871  *              because this routine is called only to write out buffers that
872  *              were changed by the executing backend.
873  */
874 int
875 WriteNoReleaseBuffer(Buffer buffer)
876 {
877         BufferDesc *bufHdr;
878
879         if (WriteMode == BUFFER_FLUSH_WRITE)
880         {
881                 return (FlushBuffer(buffer, FALSE));
882         }
883         else
884         {
885
886                 if (BufferIsLocal(buffer))
887                         return WriteLocalBuffer(buffer, FALSE);
888
889                 if (BAD_BUFFER_ID(buffer))
890                         return (STATUS_ERROR);
891
892                 bufHdr = &BufferDescriptors[buffer - 1];
893
894                 SpinAcquire(BufMgrLock);
895                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
896                 SpinRelease(BufMgrLock);
897                 CommitInfoNeedsSave[buffer - 1] = 0;
898         }
899         return (STATUS_OK);
900 }
901
902
903 #undef ReleaseAndReadBuffer
904 /*
905  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
906  *              so that only one semop needs to be called.
907  *
908  */
909 Buffer
910 ReleaseAndReadBuffer(Buffer buffer,
911                                          Relation relation,
912                                          BlockNumber blockNum)
913 {
914         BufferDesc *bufHdr;
915         Buffer          retbuf;
916
917         if (BufferIsLocal(buffer))
918         {
919                 Assert(LocalRefCount[-buffer - 1] > 0);
920                 LocalRefCount[-buffer - 1]--;
921         }
922         else
923         {
924                 if (BufferIsValid(buffer))
925                 {
926                         bufHdr = &BufferDescriptors[buffer - 1];
927                         Assert(PrivateRefCount[buffer - 1] > 0);
928                         PrivateRefCount[buffer - 1]--;
929                         if (PrivateRefCount[buffer - 1] == 0 &&
930                                 LastRefCount[buffer - 1] == 0)
931                         {
932
933                                 /*
934                                  * only release buffer if it is not pinned in previous
935                                  * ExecMain level
936                                  */
937                                 SpinAcquire(BufMgrLock);
938                                 bufHdr->refcount--;
939                                 if (bufHdr->refcount == 0)
940                                 {
941                                         AddBufferToFreelist(bufHdr);
942                                         bufHdr->flags |= BM_FREE;
943                                 }
944                                 if (CommitInfoNeedsSave[buffer - 1])
945                                 {
946                                         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
947                                         CommitInfoNeedsSave[buffer - 1] = 0;
948                                 }
949                                 retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
950                                 return retbuf;
951                         }
952                 }
953         }
954
955         return (ReadBuffer(relation, blockNum));
956 }
957
958 /*
959  * BufferSync -- Flush all dirty buffers in the pool.
960  *
961  *              This is called at transaction commit time.      It does the wrong thing,
962  *              right now.      We should flush only our own changes to stable storage,
963  *              and we should obey the lock protocol on the buffer manager metadata
964  *              as we do it.  Also, we need to be sure that no other transaction is
965  *              modifying the page as we flush it.      This is only a problem for objects
966  *              that use a non-two-phase locking protocol, like btree indices.  For
967  *              those objects, we would like to set a write lock for the duration of
968  *              our IO.  Another possibility is to code updates to btree pages
969  *              carefully, so that writing them out out of order cannot cause
970  *              any unrecoverable errors.
971  *
972  *              I don't want to think hard about this right now, so I will try
973  *              to come back to it later.
974  */
975 static void
976 BufferSync()
977 {
978         int                     i;
979         Oid                     bufdb;
980         Oid                     bufrel;
981         Relation        reln;
982         BufferDesc *bufHdr;
983         int                     status;
984
985         SpinAcquire(BufMgrLock);
986         for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
987         {
988                 if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
989                 {
990                         bufdb = bufHdr->tag.relId.dbId;
991                         bufrel = bufHdr->tag.relId.relId;
992                         if (bufdb == MyDatabaseId || bufdb == (Oid) 0)
993                         {
994                                 reln = RelationIdCacheGetRelation(bufrel);
995
996                                 /*
997                                  * We have to pin buffer to keep anyone from stealing it
998                                  * from the buffer pool while we are flushing it or
999                                  * waiting in WaitIO. It's bad for GetFreeBuffer in
1000                                  * BufferAlloc, but there is no other way to prevent
1001                                  * writing into disk block data from some other buffer,
1002                                  * getting smgr status of some other block and clearing
1003                                  * BM_DIRTY of ...                        - VAdim 09/16/96
1004                                  */
1005                                 PinBuffer(bufHdr);
1006                                 if (bufHdr->flags & BM_IO_IN_PROGRESS)
1007                                 {
1008                                         WaitIO(bufHdr, BufMgrLock);
1009                                         UnpinBuffer(bufHdr);
1010                                         if (bufHdr->flags & BM_IO_ERROR)
1011                                         {
1012                                                 elog(ERROR, "BufferSync: write error %u for %s",
1013                                                          bufHdr->tag.blockNum, bufHdr->sb_relname);
1014                                         }
1015                                         if (reln != (Relation) NULL)
1016                                                 RelationDecrementReferenceCount(reln);
1017                                         continue;
1018                                 }
1019
1020                                 /*
1021                                  * To check if block content changed while flushing (see
1022                                  * below). - vadim 01/17/97
1023                                  */
1024                                 bufHdr->flags &= ~BM_JUST_DIRTIED;
1025
1026                                 /*
1027                                  * If we didn't have the reldesc in our local cache, flush
1028                                  * this page out using the 'blind write' storage manager
1029                                  * routine.  If we did find it, use the standard
1030                                  * interface.
1031                                  */
1032
1033 #ifndef OPTIMIZE_SINGLE
1034                                 SpinRelease(BufMgrLock);
1035 #endif                                                  /* OPTIMIZE_SINGLE */
1036                                 if (reln == (Relation) NULL)
1037                                 {
1038                                         status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1039                                                                            bufHdr->sb_relname, bufdb, bufrel,
1040                                                                                   bufHdr->tag.blockNum,
1041                                                                                 (char *) MAKE_PTR(bufHdr->data));
1042                                 }
1043                                 else
1044                                 {
1045                                         status = smgrwrite(DEFAULT_SMGR, reln,
1046                                                                            bufHdr->tag.blockNum,
1047                                                                            (char *) MAKE_PTR(bufHdr->data));
1048                                 }
1049 #ifndef OPTIMIZE_SINGLE
1050                                 SpinAcquire(BufMgrLock);
1051 #endif                                                  /* OPTIMIZE_SINGLE */
1052
1053                                 UnpinBuffer(bufHdr);
1054                                 if (status == SM_FAIL)
1055                                 {
1056                                         bufHdr->flags |= BM_IO_ERROR;
1057                                         elog(ERROR, "BufferSync: cannot write %u for %s",
1058                                                  bufHdr->tag.blockNum, bufHdr->sb_relname);
1059                                 }
1060                                 BufferFlushCount++;
1061
1062                                 /*
1063                                  * If this buffer was marked by someone as DIRTY while we
1064                                  * were flushing it out we must not clear DIRTY flag -
1065                                  * vadim 01/17/97
1066                                  */
1067                                 if (!(bufHdr->flags & BM_JUST_DIRTIED))
1068                                         bufHdr->flags &= ~BM_DIRTY;
1069                                 if (reln != (Relation) NULL)
1070                                         RelationDecrementReferenceCount(reln);
1071                         }
1072                 }
1073         }
1074         SpinRelease(BufMgrLock);
1075
1076         LocalBufferSync();
1077 }
1078
1079
1080 /*
1081  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf'
1082  *              is cleared.  Because IO_IN_PROGRESS conflicts are
1083  *              expected to be rare, there is only one BufferIO
1084  *              lock in the entire system.      All processes block
1085  *              on this semaphore when they try to use a buffer
1086  *              that someone else is faulting in.  Whenever a
1087  *              process finishes an IO and someone is waiting for
1088  *              the buffer, BufferIO is signaled (SignalIO).  All
1089  *              waiting processes then wake up and check to see
1090  *              if their buffer is now ready.  This implementation
1091  *              is simple, but efficient enough if WaitIO is
1092  *              rarely called by multiple processes simultaneously.
1093  *
1094  *      ProcSleep atomically releases the spinlock and goes to
1095  *              sleep.
1096  *
1097  *      Note: there is an easy fix if the queue becomes long.
1098  *              save the id of the buffer we are waiting for in
1099  *              the queue structure.  That way signal can figure
1100  *              out which proc to wake up.
1101  */
1102 #ifdef HAS_TEST_AND_SET
1103 static void
1104 WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1105 {
1106         SpinRelease(spinlock);
1107         S_LOCK(&(buf->io_in_progress_lock));
1108         S_UNLOCK(&(buf->io_in_progress_lock));
1109         SpinAcquire(spinlock);
1110 }
1111
1112 #else                                                   /* HAS_TEST_AND_SET */
1113 IpcSemaphoreId WaitIOSemId;
1114
1115 static void
1116 WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1117 {
1118         bool            inProgress;
1119
1120         for (;;)
1121         {
1122
1123                 /* wait until someone releases IO lock */
1124                 (*NWaitIOBackendP)++;
1125                 SpinRelease(spinlock);
1126                 IpcSemaphoreLock(WaitIOSemId, 0, 1);
1127                 SpinAcquire(spinlock);
1128                 inProgress = (buf->flags & BM_IO_IN_PROGRESS);
1129                 if (!inProgress)
1130                         break;
1131         }
1132 }
1133
1134 /*
1135  * SignalIO --
1136  */
1137 static void
1138 SignalIO(BufferDesc *buf)
1139 {
1140         /* somebody better be waiting. */
1141         Assert(buf->refcount > 1);
1142         IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
1143         *NWaitIOBackendP = 0;
1144 }
1145
1146 #endif                                                  /* HAS_TEST_AND_SET */
1147
1148 long            NDirectFileRead;        /* some I/O's are direct file access.
1149                                                                  * bypass bufmgr */
1150 long            NDirectFileWrite;       /* e.g., I/O in psort and hashjoin.                                     */
1151
1152 void
1153 PrintBufferUsage(FILE *statfp)
1154 {
1155         float           hitrate;
1156         float           localhitrate;
1157
1158         if (ReadBufferCount == 0)
1159                 hitrate = 0.0;
1160         else
1161                 hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;
1162
1163         if (ReadLocalBufferCount == 0)
1164                 localhitrate = 0.0;
1165         else
1166                 localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;
1167
1168         fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
1169                         ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
1170         fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
1171                         ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
1172         fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
1173                         NDirectFileRead, NDirectFileWrite);
1174 }
1175
1176 void
1177 ResetBufferUsage()
1178 {
1179         BufferHitCount = 0;
1180         ReadBufferCount = 0;
1181         BufferFlushCount = 0;
1182         LocalBufferHitCount = 0;
1183         ReadLocalBufferCount = 0;
1184         LocalBufferFlushCount = 0;
1185         NDirectFileRead = 0;
1186         NDirectFileWrite = 0;
1187 }
1188
1189 /* ----------------------------------------------
1190  *              ResetBufferPool
1191  *
1192  *              this routine is supposed to be called when a transaction aborts.
1193  *              it will release all the buffer pins held by the transaciton.
1194  *
1195  * ----------------------------------------------
1196  */
1197 void
1198 ResetBufferPool()
1199 {
1200         register int i;
1201
1202         for (i = 1; i <= NBuffers; i++)
1203         {
1204                 CommitInfoNeedsSave[i - 1] = 0;
1205                 if (BufferIsValid(i))
1206                 {
1207                         while (PrivateRefCount[i - 1] > 0)
1208                         {
1209                                 ReleaseBuffer(i);
1210                         }
1211                 }
1212                 LastRefCount[i - 1] = 0;
1213         }
1214
1215         ResetLocalBufferPool();
1216 }
1217
1218 /* -----------------------------------------------
1219  *              BufferPoolCheckLeak
1220  *
1221  *              check if there is buffer leak
1222  *
1223  * -----------------------------------------------
1224  */
1225 int
1226 BufferPoolCheckLeak()
1227 {
1228         register int i;
1229         int                     error = 0;
1230
1231         for (i = 1; i <= NBuffers; i++)
1232         {
1233                 if (BufferIsValid(i))
1234                 {
1235                         elog(NOTICE,
1236                         "buffer leak [%d] detected in BufferPoolCheckLeak()", i - 1);
1237                         error = 1;
1238                 }
1239         }
1240         if (error)
1241         {
1242                 PrintBufferDescs();
1243                 return (1);
1244         }
1245         return (0);
1246 }
1247
1248 /* ------------------------------------------------
1249  *              FlushBufferPool
1250  *
1251  *              flush all dirty blocks in buffer pool to disk
1252  *
1253  * ------------------------------------------------
1254  */
1255 void
1256 FlushBufferPool(int StableMainMemoryFlag)
1257 {
1258         if (!StableMainMemoryFlag)
1259         {
1260                 BufferSync();
1261                 smgrcommit();
1262         }
1263 }
1264
1265 /*
1266  * BufferIsValid --
1267  *              True iff the refcnt of the local buffer is > 0
1268  * Note:
1269  *              BufferIsValid(InvalidBuffer) is False.
1270  *              BufferIsValid(UnknownBuffer) is False.
1271  */
1272 bool
1273 BufferIsValid(Buffer bufnum)
1274 {
1275         if (BufferIsLocal(bufnum))
1276                 return (bufnum >= -NLocBuffer && LocalRefCount[-bufnum - 1] > 0);
1277
1278         if (BAD_BUFFER_ID(bufnum))
1279                 return (false);
1280
1281         return ((bool) (PrivateRefCount[bufnum - 1] > 0));
1282 }
1283
1284 /*
1285  * BufferGetBlockNumber --
1286  *              Returns the block number associated with a buffer.
1287  *
1288  * Note:
1289  *              Assumes that the buffer is valid.
1290  */
1291 BlockNumber
1292 BufferGetBlockNumber(Buffer buffer)
1293 {
1294         Assert(BufferIsValid(buffer));
1295
1296         /* XXX should be a critical section */
1297         if (BufferIsLocal(buffer))
1298                 return (LocalBufferDescriptors[-buffer - 1].tag.blockNum);
1299         else
1300                 return (BufferDescriptors[buffer - 1].tag.blockNum);
1301 }
1302
1303 /*
1304  * BufferGetRelation --
1305  *              Returns the relation desciptor associated with a buffer.
1306  *
1307  * Note:
1308  *              Assumes buffer is valid.
1309  */
1310 Relation
1311 BufferGetRelation(Buffer buffer)
1312 {
1313         Relation        relation;
1314         Oid                     relid;
1315
1316         Assert(BufferIsValid(buffer));
1317         Assert(!BufferIsLocal(buffer));         /* not supported for local buffers */
1318
1319         /* XXX should be a critical section */
1320         relid = LRelIdGetRelationId(BufferDescriptors[buffer - 1].tag.relId);
1321         relation = RelationIdGetRelation(relid);
1322
1323         RelationDecrementReferenceCount(relation);
1324
1325         if (RelationHasReferenceCountZero(relation))
1326         {
1327
1328                 /*
1329                  * elog(NOTICE, "BufferGetRelation: 0->1");
1330                  */
1331
1332                 RelationIncrementReferenceCount(relation);
1333         }
1334
1335         return (relation);
1336 }
1337
1338 /*
1339  * BufferReplace
1340  *
1341  * Flush the buffer corresponding to 'bufHdr'
1342  *
1343  */
1344 static int
1345 BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
1346 {
1347         Relation        reln;
1348         Oid                     bufdb,
1349                                 bufrel;
1350         int                     status;
1351
1352         if (!bufferLockHeld)
1353                 SpinAcquire(BufMgrLock);
1354
1355         /*
1356          * first try to find the reldesc in the cache, if no luck, don't
1357          * bother to build the reldesc from scratch, just do a blind write.
1358          */
1359
1360         bufdb = bufHdr->tag.relId.dbId;
1361         bufrel = bufHdr->tag.relId.relId;
1362
1363         if (bufdb == MyDatabaseId || bufdb == (Oid) NULL)
1364                 reln = RelationIdCacheGetRelation(bufrel);
1365         else
1366                 reln = (Relation) NULL;
1367
1368         /* To check if block content changed while flushing. - vadim 01/17/97 */
1369         bufHdr->flags &= ~BM_JUST_DIRTIED;
1370
1371         SpinRelease(BufMgrLock);
1372
1373         if (reln != (Relation) NULL)
1374         {
1375                 status = smgrflush(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
1376                                                    (char *) MAKE_PTR(bufHdr->data));
1377         }
1378         else
1379         {
1380
1381                 /* blind write always flushes */
1382                 status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1383                                                           bufHdr->sb_relname, bufdb, bufrel,
1384                                                           bufHdr->tag.blockNum,
1385                                                           (char *) MAKE_PTR(bufHdr->data));
1386         }
1387         
1388         if (reln != (Relation) NULL)
1389                 RelationDecrementReferenceCount(reln);
1390
1391         if (status == SM_FAIL)
1392                 return (FALSE);
1393
1394         BufferFlushCount++;
1395
1396         return (TRUE);
1397 }
1398
1399 /*
1400  * RelationGetNumberOfBlocks --
1401  *              Returns the buffer descriptor associated with a page in a relation.
1402  *
1403  * Note:
1404  *              XXX may fail for huge relations.
1405  *              XXX should be elsewhere.
1406  *              XXX maybe should be hidden
1407  */
1408 BlockNumber
1409 RelationGetNumberOfBlocks(Relation relation)
1410 {
1411         return
1412         ((relation->rd_islocal) ? relation->rd_nblocks :
1413          smgrnblocks(DEFAULT_SMGR, relation));
1414 }
1415
1416 /*
1417  * BufferGetBlock --
1418  *              Returns a reference to a disk page image associated with a buffer.
1419  *
1420  * Note:
1421  *              Assumes buffer is valid.
1422  */
1423 Block
1424 BufferGetBlock(Buffer buffer)
1425 {
1426         Assert(BufferIsValid(buffer));
1427
1428         if (BufferIsLocal(buffer))
1429                 return ((Block) MAKE_PTR(LocalBufferDescriptors[-buffer - 1].data));
1430         else
1431                 return ((Block) MAKE_PTR(BufferDescriptors[buffer - 1].data));
1432 }
1433
1434 /* ---------------------------------------------------------------------
1435  *              ReleaseRelationBuffers
1436  *
1437  *              this function unmarks all the dirty pages of a relation
1438  *              in the buffer pool so that at the end of transaction
1439  *              these pages will not be flushed.
1440  *              XXX currently it sequentially searches the buffer pool, should be
1441  *              changed to more clever ways of searching.
1442  * --------------------------------------------------------------------
1443  */
1444 void
1445 ReleaseRelationBuffers(Relation rdesc)
1446 {
1447         register int i;
1448         int                     holding = 0;
1449         BufferDesc *buf;
1450
1451         if (rdesc->rd_islocal)
1452         {
1453                 for (i = 0; i < NLocBuffer; i++)
1454                 {
1455                         buf = &LocalBufferDescriptors[i];
1456                         if ((buf->flags & BM_DIRTY) &&
1457                                 (buf->tag.relId.relId == rdesc->rd_id))
1458                         {
1459                                 buf->flags &= ~BM_DIRTY;
1460                         }
1461                 }
1462                 return;
1463         }
1464
1465         for (i = 1; i <= NBuffers; i++)
1466         {
1467                 buf = &BufferDescriptors[i - 1];
1468                 if (!holding)
1469                 {
1470                         SpinAcquire(BufMgrLock);
1471                         holding = 1;
1472                 }
1473                 if ((buf->flags & BM_DIRTY) &&
1474                         (buf->tag.relId.dbId == MyDatabaseId) &&
1475                         (buf->tag.relId.relId == rdesc->rd_id))
1476                 {
1477                         buf->flags &= ~BM_DIRTY;
1478                         if (!(buf->flags & BM_FREE))
1479                         {
1480                                 SpinRelease(BufMgrLock);
1481                                 holding = 0;
1482                                 ReleaseBuffer(i);
1483                         }
1484                 }
1485         }
1486         if (holding)
1487                 SpinRelease(BufMgrLock);
1488 }
1489
1490 /* ---------------------------------------------------------------------
1491  *              DropBuffers
1492  *
1493  *              This function marks all the buffers in the buffer cache for a
1494  *              particular database as clean.  This is used when we destroy a
1495  *              database, to avoid trying to flush data to disk when the directory
1496  *              tree no longer exists.
1497  *
1498  *              This is an exceedingly non-public interface.
1499  * --------------------------------------------------------------------
1500  */
1501 void
1502 DropBuffers(Oid dbid)
1503 {
1504         register int i;
1505         BufferDesc *buf;
1506
1507         SpinAcquire(BufMgrLock);
1508         for (i = 1; i <= NBuffers; i++)
1509         {
1510                 buf = &BufferDescriptors[i - 1];
1511                 if ((buf->tag.relId.dbId == dbid) && (buf->flags & BM_DIRTY))
1512                 {
1513                         buf->flags &= ~BM_DIRTY;
1514                 }
1515         }
1516         SpinRelease(BufMgrLock);
1517 }
1518
1519 /* -----------------------------------------------------------------
1520  *              PrintBufferDescs
1521  *
1522  *              this function prints all the buffer descriptors, for debugging
1523  *              use only.
1524  * -----------------------------------------------------------------
1525  */
1526 void
1527 PrintBufferDescs()
1528 {
1529         int                     i;
1530         BufferDesc *buf = BufferDescriptors;
1531
1532         if (IsUnderPostmaster)
1533         {
1534                 SpinAcquire(BufMgrLock);
1535                 for (i = 0; i < NBuffers; ++i, ++buf)
1536                 {
1537                         elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1538 blockNum=%d, flags=0x%x, refcount=%d %d)",
1539                                  i, buf->freeNext, buf->freePrev,
1540                                  buf->sb_relname, buf->tag.blockNum, buf->flags,
1541                                  buf->refcount, PrivateRefCount[i]);
1542                 }
1543                 SpinRelease(BufMgrLock);
1544         }
1545         else
1546         {
1547                 /* interactive backend */
1548                 for (i = 0; i < NBuffers; ++i, ++buf)
1549                 {
1550                         printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
1551                                    i, buf->sb_relname, buf->tag.blockNum,
1552                                    buf->flags, buf->refcount, PrivateRefCount[i]);
1553                 }
1554         }
1555 }
1556
1557 void
1558 PrintPinnedBufs()
1559 {
1560         int                     i;
1561         BufferDesc *buf = BufferDescriptors;
1562
1563         SpinAcquire(BufMgrLock);
1564         for (i = 0; i < NBuffers; ++i, ++buf)
1565         {
1566                 if (PrivateRefCount[i] > 0)
1567                         elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1568 blockNum=%d, flags=0x%x, refcount=%d %d)\n",
1569                                  i, buf->freeNext, buf->freePrev, buf->sb_relname,
1570                                  buf->tag.blockNum, buf->flags,
1571                                  buf->refcount, PrivateRefCount[i]);
1572         }
1573         SpinRelease(BufMgrLock);
1574 }
1575
1576 /*
1577  * BufferPoolBlowaway
1578  *
1579  * this routine is solely for the purpose of experiments -- sometimes
1580  * you may want to blowaway whatever is left from the past in buffer
1581  * pool and start measuring some performance with a clean empty buffer
1582  * pool.
1583  */
1584 #ifdef NOT_USED
1585 void
1586 BufferPoolBlowaway()
1587 {
1588         register int i;
1589
1590         BufferSync();
1591         for (i = 1; i <= NBuffers; i++)
1592         {
1593                 if (BufferIsValid(i))
1594                 {
1595                         while (BufferIsValid(i))
1596                                 ReleaseBuffer(i);
1597                 }
1598                 BufTableDelete(&BufferDescriptors[i - 1]);
1599         }
1600 }
1601
1602 #endif
1603
1604 /* ---------------------------------------------------------------------
1605  *              BlowawayRelationBuffers
1606  *
1607  *              This function blowaway all the pages with blocknumber >= passed
1608  *              of a relation in the buffer pool. Used by vacuum before truncation...
1609  *              
1610  *              Returns: 0 - Ok, -1 - DIRTY, -2 - PINNED
1611  *              
1612  *              XXX currently it sequentially searches the buffer pool, should be
1613  *              changed to more clever ways of searching.
1614  * --------------------------------------------------------------------
1615  */
1616 int
1617 BlowawayRelationBuffers(Relation rdesc, BlockNumber block)
1618 {
1619         register int    i;
1620         BufferDesc         *buf;
1621
1622         if (rdesc->rd_islocal)
1623         {
1624                 for (i = 0; i < NLocBuffer; i++)
1625                 {
1626                         buf = &LocalBufferDescriptors[i];
1627                         if (buf->tag.relId.relId == rdesc->rd_id && 
1628                                 buf->tag.blockNum >= block)
1629                         {
1630                                 if (buf->flags & BM_DIRTY)
1631                                 {
1632                                         elog (NOTICE, "BlowawayRelationBuffers(%s (local), %u): block %u is dirty",
1633                                                 rdesc->rd_rel->relname.data, block, buf->tag.blockNum);
1634                                         return (-1);
1635                                 }
1636                                 if (LocalRefCount[i] > 0)
1637                                 {
1638                                         elog (NOTICE, "BlowawayRelationBuffers(%s (local), %u): block %u is referenced (%d)",
1639                                                 rdesc->rd_rel->relname.data, block, 
1640                                                 buf->tag.blockNum, LocalRefCount[i]);
1641                                         return (-2);
1642                                 }
1643                                 buf->tag.relId.relId = InvalidOid;
1644                         }
1645                 }
1646                 return (0);
1647         }
1648
1649         SpinAcquire(BufMgrLock);
1650         for (i = 0; i < NBuffers; i++)
1651         {
1652                 buf = &BufferDescriptors[i];
1653                 if (buf->tag.relId.dbId == MyDatabaseId &&
1654                         buf->tag.relId.relId == rdesc->rd_id && 
1655                         buf->tag.blockNum >= block)
1656                 {
1657                         if (buf->flags & BM_DIRTY)
1658                         {
1659                                 elog (NOTICE, "BlowawayRelationBuffers(%s, %u): block %u is dirty (private %d, last %d, global %d)",
1660                                         buf->sb_relname, block, buf->tag.blockNum, 
1661                                         PrivateRefCount[i], LastRefCount[i], buf->refcount);
1662                                 SpinRelease(BufMgrLock);
1663                                 return (-1);
1664                         }
1665                         if (!(buf->flags & BM_FREE))
1666                         {
1667                                 elog (NOTICE, "BlowawayRelationBuffers(%s, %u): block %u is referenced (private %d, last %d, global %d)",
1668                                         buf->sb_relname, block, buf->tag.blockNum, 
1669                                         PrivateRefCount[i], LastRefCount[i], buf->refcount);
1670                                 SpinRelease(BufMgrLock);
1671                                 return (-2);
1672                         }
1673                         BufTableDelete(buf);
1674                 }
1675         }
1676         SpinRelease(BufMgrLock);
1677         return (0);
1678 }
1679
1680 #undef IncrBufferRefCount
1681 #undef ReleaseBuffer
1682
1683 void
1684 IncrBufferRefCount(Buffer buffer)
1685 {
1686         if (BufferIsLocal(buffer))
1687         {
1688                 Assert(LocalRefCount[-buffer - 1] >= 0);
1689                 LocalRefCount[-buffer - 1]++;
1690         }
1691         else
1692         {
1693                 Assert(!BAD_BUFFER_ID(buffer));
1694                 Assert(PrivateRefCount[buffer - 1] >= 0);
1695                 PrivateRefCount[buffer - 1]++;
1696         }
1697 }
1698
1699 /*
1700  * ReleaseBuffer -- remove the pin on a buffer without
1701  *              marking it dirty.
1702  *
1703  */
1704 int
1705 ReleaseBuffer(Buffer buffer)
1706 {
1707         BufferDesc *bufHdr;
1708
1709         if (BufferIsLocal(buffer))
1710         {
1711                 Assert(LocalRefCount[-buffer - 1] > 0);
1712                 LocalRefCount[-buffer - 1]--;
1713                 return (STATUS_OK);
1714         }
1715
1716         if (BAD_BUFFER_ID(buffer))
1717                 return (STATUS_ERROR);
1718
1719         bufHdr = &BufferDescriptors[buffer - 1];
1720
1721         Assert(PrivateRefCount[buffer - 1] > 0);
1722         PrivateRefCount[buffer - 1]--;
1723         if (PrivateRefCount[buffer - 1] == 0 && LastRefCount[buffer - 1] == 0)
1724         {
1725
1726                 /*
1727                  * only release buffer if it is not pinned in previous ExecMain
1728                  * levels
1729                  */
1730                 SpinAcquire(BufMgrLock);
1731                 bufHdr->refcount--;
1732                 if (bufHdr->refcount == 0)
1733                 {
1734                         AddBufferToFreelist(bufHdr);
1735                         bufHdr->flags |= BM_FREE;
1736                 }
1737                 if (CommitInfoNeedsSave[buffer - 1])
1738                 {
1739                         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1740                         CommitInfoNeedsSave[buffer - 1] = 0;
1741                 }
1742                 SpinRelease(BufMgrLock);
1743         }
1744
1745         return (STATUS_OK);
1746 }
1747
1748 #ifdef NOT_USED
1749 void
1750 IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
1751 {
1752         IncrBufferRefCount(buffer);
1753         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
1754         {
1755                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1756
1757                 fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
1758 refcount = %ld, file: %s, line: %d\n",
1759                                 buffer, buf->sb_relname, buf->tag.blockNum,
1760                                 PrivateRefCount[buffer - 1], file, line);
1761         }
1762 }
1763
1764 #endif
1765
1766 #ifdef NOT_USED
1767 void
1768 ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
1769 {
1770         ReleaseBuffer(buffer);
1771         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
1772         {
1773                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1774
1775                 fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
1776 refcount = %ld, file: %s, line: %d\n",
1777                                 buffer, buf->sb_relname, buf->tag.blockNum,
1778                                 PrivateRefCount[buffer - 1], file, line);
1779         }
1780 }
1781
1782 #endif
1783
1784 #ifdef NOT_USED
1785 int
1786 ReleaseAndReadBuffer_Debug(char *file,
1787                                                    int line,
1788                                                    Buffer buffer,
1789                                                    Relation relation,
1790                                                    BlockNumber blockNum)
1791 {
1792         bool            bufferValid;
1793         Buffer          b;
1794
1795         bufferValid = BufferIsValid(buffer);
1796         b = ReleaseAndReadBuffer(buffer, relation, blockNum);
1797         if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
1798                 && is_userbuffer(buffer))
1799         {
1800                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1801
1802                 fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1803 refcount = %ld, file: %s, line: %d\n",
1804                                 buffer, buf->sb_relname, buf->tag.blockNum,
1805                                 PrivateRefCount[buffer - 1], file, line);
1806         }
1807         if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
1808         {
1809                 BufferDesc *buf = &BufferDescriptors[b - 1];
1810
1811                 fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1812 refcount = %ld, file: %s, line: %d\n",
1813                                 b, buf->sb_relname, buf->tag.blockNum,
1814                                 PrivateRefCount[b - 1], file, line);
1815         }
1816         return b;
1817 }
1818
1819 #endif
1820
1821 #ifdef BMTRACE
1822
1823 /*
1824  *      trace allocations and deallocations in a circular buffer in
1825  *      shared memory.  check the buffer before doing the allocation,
1826  *      and die if there's anything fishy.
1827  */
1828
1829 _bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
1830 {
1831         static int      mypid = 0;
1832         long            start,
1833                                 cur;
1834         bmtrace    *tb;
1835
1836         if (mypid == 0)
1837                 mypid = getpid();
1838
1839         start = *CurTraceBuf;
1840
1841         if (start > 0)
1842                 cur = start - 1;
1843         else
1844                 cur = BMT_LIMIT - 1;
1845
1846         for (;;)
1847         {
1848                 tb = &TraceBuf[cur];
1849                 if (tb->bmt_op != BMT_NOTUSED)
1850                 {
1851                         if (tb->bmt_buf == bufNo)
1852                         {
1853                                 if ((tb->bmt_op == BMT_DEALLOC)
1854                                         || (tb->bmt_dbid == dbId && tb->bmt_relid == relId
1855                                                 && tb->bmt_blkno == blkNo))
1856                                         goto okay;
1857
1858                                 /* die holding the buffer lock */
1859                                 _bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
1860                         }
1861                 }
1862
1863                 if (cur == start)
1864                         goto okay;
1865
1866                 if (cur == 0)
1867                         cur = BMT_LIMIT - 1;
1868                 else
1869                         cur--;
1870         }
1871
1872 okay:
1873         tb = &TraceBuf[start];
1874         tb->bmt_pid = mypid;
1875         tb->bmt_buf = bufNo;
1876         tb->bmt_dbid = dbId;
1877         tb->bmt_relid = relId;
1878         tb->bmt_blkno = blkNo;
1879         tb->bmt_op = allocType;
1880
1881         *CurTraceBuf = (start + 1) % BMT_LIMIT;
1882 }
1883
1884 _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
1885                 int allocType, long start, long cur)
1886 {
1887         FILE       *fp;
1888         bmtrace    *tb;
1889         int                     i;
1890
1891         tb = &TraceBuf[cur];
1892
1893         if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
1894                 elog(FATAL, "buffer alloc trace error and can't open log file");
1895
1896         fprintf(fp, "buffer alloc trace detected the following error:\n\n");
1897         fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
1898                  bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
1899                         (tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));
1900
1901         fprintf(fp, "the trace buffer contains:\n");
1902
1903         i = start;
1904         for (;;)
1905         {
1906                 tb = &TraceBuf[i];
1907                 if (tb->bmt_op != BMT_NOTUSED)
1908                 {
1909                         fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%d,%d> ",
1910                                         i, (i == cur ? " ---> " : "\t"),
1911                                         tb->bmt_pid, tb->bmt_buf,
1912                                         tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
1913
1914                         switch (tb->bmt_op)
1915                         {
1916                                 case BMT_ALLOCFND:
1917                                         fprintf(fp, "allocate (found)\n");
1918                                         break;
1919
1920                                 case BMT_ALLOCNOTFND:
1921                                         fprintf(fp, "allocate (not found)\n");
1922                                         break;
1923
1924                                 case BMT_DEALLOC:
1925                                         fprintf(fp, "deallocate\n");
1926                                         break;
1927
1928                                 default:
1929                                         fprintf(fp, "unknown op type %d\n", tb->bmt_op);
1930                                         break;
1931                         }
1932                 }
1933
1934                 i = (i + 1) % BMT_LIMIT;
1935                 if (i == start)
1936                         break;
1937         }
1938
1939         fprintf(fp, "\noperation causing error:\n");
1940         fprintf(fp, "\tpid %d buf %d for <%d,%d,%d> ",
1941                         getpid(), bufNo, dbId, relId, blkNo);
1942
1943         switch (allocType)
1944         {
1945                 case BMT_ALLOCFND:
1946                         fprintf(fp, "allocate (found)\n");
1947                         break;
1948
1949                 case BMT_ALLOCNOTFND:
1950                         fprintf(fp, "allocate (not found)\n");
1951                         break;
1952
1953                 case BMT_DEALLOC:
1954                         fprintf(fp, "deallocate\n");
1955                         break;
1956
1957                 default:
1958                         fprintf(fp, "unknown op type %d\n", allocType);
1959                         break;
1960         }
1961
1962         FreeFile(fp);
1963
1964         kill(getpid(), SIGILL);
1965 }
1966
1967 #endif                                                  /* BMTRACE */
1968
1969 void
1970 BufferRefCountReset(int *refcountsave)
1971 {
1972         int                     i;
1973
1974         for (i = 0; i < NBuffers; i++)
1975         {
1976                 refcountsave[i] = PrivateRefCount[i];
1977                 LastRefCount[i] += PrivateRefCount[i];
1978                 PrivateRefCount[i] = 0;
1979         }
1980 }
1981
1982 void
1983 BufferRefCountRestore(int *refcountsave)
1984 {
1985         int                     i;
1986
1987         for (i = 0; i < NBuffers; i++)
1988         {
1989                 PrivateRefCount[i] = refcountsave[i];
1990                 LastRefCount[i] -= refcountsave[i];
1991                 refcountsave[i] = 0;
1992         }
1993 }
1994
1995 int
1996 SetBufferWriteMode(int mode)
1997 {
1998         int                     old;
1999
2000         old = WriteMode;
2001         WriteMode = mode;
2002         return (old);
2003 }
2004
2005 void
2006 SetBufferCommitInfoNeedsSave(Buffer buffer)
2007 {
2008         if (!BufferIsLocal(buffer))
2009                 CommitInfoNeedsSave[buffer - 1]++;
2010 }