]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/hio.c
Update copyright for 2016
[postgresql] / src / backend / access / heap / hio.c
1 /*-------------------------------------------------------------------------
2  *
3  * hio.c
4  *        POSTGRES heap access method input/output code.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/hio.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/hio.h"
20 #include "access/htup_details.h"
21 #include "access/visibilitymap.h"
22 #include "storage/bufmgr.h"
23 #include "storage/freespace.h"
24 #include "storage/lmgr.h"
25 #include "storage/smgr.h"
26
27
28 /*
29  * RelationPutHeapTuple - place tuple at specified page
30  *
31  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
32  *
33  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
34  */
35 void
36 RelationPutHeapTuple(Relation relation,
37                                          Buffer buffer,
38                                          HeapTuple tuple,
39                                          bool token)
40 {
41         Page            pageHeader;
42         OffsetNumber offnum;
43
44         /*
45          * A tuple that's being inserted speculatively should already have its
46          * token set.
47          */
48         Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
49
50         /* Add the tuple to the page */
51         pageHeader = BufferGetPage(buffer);
52
53         offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
54                                                  tuple->t_len, InvalidOffsetNumber, false, true);
55
56         if (offnum == InvalidOffsetNumber)
57                 elog(PANIC, "failed to add tuple to page");
58
59         /* Update tuple->t_self to the actual position where it was stored */
60         ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
61
62         /*
63          * Insert the correct position into CTID of the stored tuple, too (unless
64          * this is a speculative insertion, in which case the token is held in
65          * CTID field instead)
66          */
67         if (!token)
68         {
69                 ItemId          itemId = PageGetItemId(pageHeader, offnum);
70                 Item            item = PageGetItem(pageHeader, itemId);
71
72                 ((HeapTupleHeader) item)->t_ctid = tuple->t_self;
73         }
74 }
75
76 /*
77  * Read in a buffer, using bulk-insert strategy if bistate isn't NULL.
78  */
79 static Buffer
80 ReadBufferBI(Relation relation, BlockNumber targetBlock,
81                          BulkInsertState bistate)
82 {
83         Buffer          buffer;
84
85         /* If not bulk-insert, exactly like ReadBuffer */
86         if (!bistate)
87                 return ReadBuffer(relation, targetBlock);
88
89         /* If we have the desired block already pinned, re-pin and return it */
90         if (bistate->current_buf != InvalidBuffer)
91         {
92                 if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
93                 {
94                         IncrBufferRefCount(bistate->current_buf);
95                         return bistate->current_buf;
96                 }
97                 /* ... else drop the old buffer */
98                 ReleaseBuffer(bistate->current_buf);
99                 bistate->current_buf = InvalidBuffer;
100         }
101
102         /* Perform a read using the buffer strategy */
103         buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
104                                                                 RBM_NORMAL, bistate->strategy);
105
106         /* Save the selected block as target for future inserts */
107         IncrBufferRefCount(buffer);
108         bistate->current_buf = buffer;
109
110         return buffer;
111 }
112
113 /*
114  * For each heap page which is all-visible, acquire a pin on the appropriate
115  * visibility map page, if we haven't already got one.
116  *
117  * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
118  * must not be InvalidBuffer.  If both buffers are specified, buffer1 must
119  * be less than buffer2.
120  */
121 static void
122 GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
123                                          BlockNumber block1, BlockNumber block2,
124                                          Buffer *vmbuffer1, Buffer *vmbuffer2)
125 {
126         bool            need_to_pin_buffer1;
127         bool            need_to_pin_buffer2;
128
129         Assert(BufferIsValid(buffer1));
130         Assert(buffer2 == InvalidBuffer || buffer1 <= buffer2);
131
132         while (1)
133         {
134                 /* Figure out which pins we need but don't have. */
135                 need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
136                         && !visibilitymap_pin_ok(block1, *vmbuffer1);
137                 need_to_pin_buffer2 = buffer2 != InvalidBuffer
138                         && PageIsAllVisible(BufferGetPage(buffer2))
139                         && !visibilitymap_pin_ok(block2, *vmbuffer2);
140                 if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
141                         return;
142
143                 /* We must unlock both buffers before doing any I/O. */
144                 LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
145                 if (buffer2 != InvalidBuffer && buffer2 != buffer1)
146                         LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
147
148                 /* Get pins. */
149                 if (need_to_pin_buffer1)
150                         visibilitymap_pin(relation, block1, vmbuffer1);
151                 if (need_to_pin_buffer2)
152                         visibilitymap_pin(relation, block2, vmbuffer2);
153
154                 /* Relock buffers. */
155                 LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
156                 if (buffer2 != InvalidBuffer && buffer2 != buffer1)
157                         LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
158
159                 /*
160                  * If there are two buffers involved and we pinned just one of them,
161                  * it's possible that the second one became all-visible while we were
162                  * busy pinning the first one.  If it looks like that's a possible
163                  * scenario, we'll need to make a second pass through this loop.
164                  */
165                 if (buffer2 == InvalidBuffer || buffer1 == buffer2
166                         || (need_to_pin_buffer1 && need_to_pin_buffer2))
167                         break;
168         }
169 }
170
171 /*
172  * RelationGetBufferForTuple
173  *
174  *      Returns pinned and exclusive-locked buffer of a page in given relation
175  *      with free space >= given len.
176  *
177  *      If otherBuffer is not InvalidBuffer, then it references a previously
178  *      pinned buffer of another page in the same relation; on return, this
179  *      buffer will also be exclusive-locked.  (This case is used by heap_update;
180  *      the otherBuffer contains the tuple being updated.)
181  *
182  *      The reason for passing otherBuffer is that if two backends are doing
183  *      concurrent heap_update operations, a deadlock could occur if they try
184  *      to lock the same two buffers in opposite orders.  To ensure that this
185  *      can't happen, we impose the rule that buffers of a relation must be
186  *      locked in increasing page number order.  This is most conveniently done
187  *      by having RelationGetBufferForTuple lock them both, with suitable care
188  *      for ordering.
189  *
190  *      NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
191  *      same buffer we select for insertion of the new tuple (this could only
192  *      happen if space is freed in that page after heap_update finds there's not
193  *      enough there).  In that case, the page will be pinned and locked only once.
194  *
195  *      For the vmbuffer and vmbuffer_other arguments, we avoid deadlock by
196  *      locking them only after locking the corresponding heap page, and taking
197  *      no further lwlocks while they are locked.
198  *
199  *      We normally use FSM to help us find free space.  However,
200  *      if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
201  *      the end of the relation if the tuple won't fit on the current target page.
202  *      This can save some cycles when we know the relation is new and doesn't
203  *      contain useful amounts of free space.
204  *
205  *      HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
206  *      relation, if the caller holds exclusive lock and is careful to invalidate
207  *      relation's smgr_targblock before the first insertion --- that ensures that
208  *      all insertions will occur into newly added pages and not be intermixed
209  *      with tuples from other transactions.  That way, a crash can't risk losing
210  *      any committed data of other transactions.  (See heap_insert's comments
211  *      for additional constraints needed for safe usage of this behavior.)
212  *
213  *      The caller can also provide a BulkInsertState object to optimize many
214  *      insertions into the same relation.  This keeps a pin on the current
215  *      insertion target page (to save pin/unpin cycles) and also passes a
216  *      BULKWRITE buffer selection strategy object to the buffer manager.
217  *      Passing NULL for bistate selects the default behavior.
218  *
219  *      We always try to avoid filling existing pages further than the fillfactor.
220  *      This is OK since this routine is not consulted when updating a tuple and
221  *      keeping it on the same page, which is the scenario fillfactor is meant
222  *      to reserve space for.
223  *
224  *      ereport(ERROR) is allowed here, so this routine *must* be called
225  *      before any (unlogged) changes are made in buffer pool.
226  */
227 Buffer
228 RelationGetBufferForTuple(Relation relation, Size len,
229                                                   Buffer otherBuffer, int options,
230                                                   BulkInsertState bistate,
231                                                   Buffer *vmbuffer, Buffer *vmbuffer_other)
232 {
233         bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
234         Buffer          buffer = InvalidBuffer;
235         Page            page;
236         Size            pageFreeSpace,
237                                 saveFreeSpace;
238         BlockNumber targetBlock,
239                                 otherBlock;
240         bool            needLock;
241
242         len = MAXALIGN(len);            /* be conservative */
243
244         /* Bulk insert is not supported for updates, only inserts. */
245         Assert(otherBuffer == InvalidBuffer || !bistate);
246
247         /*
248          * If we're gonna fail for oversize tuple, do it right away
249          */
250         if (len > MaxHeapTupleSize)
251                 ereport(ERROR,
252                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
253                                  errmsg("row is too big: size %zu, maximum size %zu",
254                                                 len, MaxHeapTupleSize)));
255
256         /* Compute desired extra freespace due to fillfactor option */
257         saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
258                                                                                                    HEAP_DEFAULT_FILLFACTOR);
259
260         if (otherBuffer != InvalidBuffer)
261                 otherBlock = BufferGetBlockNumber(otherBuffer);
262         else
263                 otherBlock = InvalidBlockNumber;                /* just to keep compiler quiet */
264
265         /*
266          * We first try to put the tuple on the same page we last inserted a tuple
267          * on, as cached in the BulkInsertState or relcache entry.  If that
268          * doesn't work, we ask the Free Space Map to locate a suitable page.
269          * Since the FSM's info might be out of date, we have to be prepared to
270          * loop around and retry multiple times. (To insure this isn't an infinite
271          * loop, we must update the FSM with the correct amount of free space on
272          * each page that proves not to be suitable.)  If the FSM has no record of
273          * a page with enough free space, we give up and extend the relation.
274          *
275          * When use_fsm is false, we either put the tuple onto the existing target
276          * page or extend the relation.
277          */
278         if (len + saveFreeSpace > MaxHeapTupleSize)
279         {
280                 /* can't fit, don't bother asking FSM */
281                 targetBlock = InvalidBlockNumber;
282                 use_fsm = false;
283         }
284         else if (bistate && bistate->current_buf != InvalidBuffer)
285                 targetBlock = BufferGetBlockNumber(bistate->current_buf);
286         else
287                 targetBlock = RelationGetTargetBlock(relation);
288
289         if (targetBlock == InvalidBlockNumber && use_fsm)
290         {
291                 /*
292                  * We have no cached target page, so ask the FSM for an initial
293                  * target.
294                  */
295                 targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
296
297                 /*
298                  * If the FSM knows nothing of the rel, try the last page before we
299                  * give up and extend.  This avoids one-tuple-per-page syndrome during
300                  * bootstrapping or in a recently-started system.
301                  */
302                 if (targetBlock == InvalidBlockNumber)
303                 {
304                         BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
305
306                         if (nblocks > 0)
307                                 targetBlock = nblocks - 1;
308                 }
309         }
310
311         while (targetBlock != InvalidBlockNumber)
312         {
313                 /*
314                  * Read and exclusive-lock the target block, as well as the other
315                  * block if one was given, taking suitable care with lock ordering and
316                  * the possibility they are the same block.
317                  *
318                  * If the page-level all-visible flag is set, caller will need to
319                  * clear both that and the corresponding visibility map bit.  However,
320                  * by the time we return, we'll have x-locked the buffer, and we don't
321                  * want to do any I/O while in that state.  So we check the bit here
322                  * before taking the lock, and pin the page if it appears necessary.
323                  * Checking without the lock creates a risk of getting the wrong
324                  * answer, so we'll have to recheck after acquiring the lock.
325                  */
326                 if (otherBuffer == InvalidBuffer)
327                 {
328                         /* easy case */
329                         buffer = ReadBufferBI(relation, targetBlock, bistate);
330                         if (PageIsAllVisible(BufferGetPage(buffer)))
331                                 visibilitymap_pin(relation, targetBlock, vmbuffer);
332                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
333                 }
334                 else if (otherBlock == targetBlock)
335                 {
336                         /* also easy case */
337                         buffer = otherBuffer;
338                         if (PageIsAllVisible(BufferGetPage(buffer)))
339                                 visibilitymap_pin(relation, targetBlock, vmbuffer);
340                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
341                 }
342                 else if (otherBlock < targetBlock)
343                 {
344                         /* lock other buffer first */
345                         buffer = ReadBuffer(relation, targetBlock);
346                         if (PageIsAllVisible(BufferGetPage(buffer)))
347                                 visibilitymap_pin(relation, targetBlock, vmbuffer);
348                         LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
349                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
350                 }
351                 else
352                 {
353                         /* lock target buffer first */
354                         buffer = ReadBuffer(relation, targetBlock);
355                         if (PageIsAllVisible(BufferGetPage(buffer)))
356                                 visibilitymap_pin(relation, targetBlock, vmbuffer);
357                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
358                         LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
359                 }
360
361                 /*
362                  * We now have the target page (and the other buffer, if any) pinned
363                  * and locked.  However, since our initial PageIsAllVisible checks
364                  * were performed before acquiring the lock, the results might now be
365                  * out of date, either for the selected victim buffer, or for the
366                  * other buffer passed by the caller.  In that case, we'll need to
367                  * give up our locks, go get the pin(s) we failed to get earlier, and
368                  * re-lock.  That's pretty painful, but hopefully shouldn't happen
369                  * often.
370                  *
371                  * Note that there's a small possibility that we didn't pin the page
372                  * above but still have the correct page pinned anyway, either because
373                  * we've already made a previous pass through this loop, or because
374                  * caller passed us the right page anyway.
375                  *
376                  * Note also that it's possible that by the time we get the pin and
377                  * retake the buffer locks, the visibility map bit will have been
378                  * cleared by some other backend anyway.  In that case, we'll have
379                  * done a bit of extra work for no gain, but there's no real harm
380                  * done.
381                  */
382                 if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
383                         GetVisibilityMapPins(relation, buffer, otherBuffer,
384                                                                  targetBlock, otherBlock, vmbuffer,
385                                                                  vmbuffer_other);
386                 else
387                         GetVisibilityMapPins(relation, otherBuffer, buffer,
388                                                                  otherBlock, targetBlock, vmbuffer_other,
389                                                                  vmbuffer);
390
391                 /*
392                  * Now we can check to see if there's enough free space here. If so,
393                  * we're done.
394                  */
395                 page = BufferGetPage(buffer);
396                 pageFreeSpace = PageGetHeapFreeSpace(page);
397                 if (len + saveFreeSpace <= pageFreeSpace)
398                 {
399                         /* use this page as future insert target, too */
400                         RelationSetTargetBlock(relation, targetBlock);
401                         return buffer;
402                 }
403
404                 /*
405                  * Not enough space, so we must give up our page locks and pin (if
406                  * any) and prepare to look elsewhere.  We don't care which order we
407                  * unlock the two buffers in, so this can be slightly simpler than the
408                  * code above.
409                  */
410                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
411                 if (otherBuffer == InvalidBuffer)
412                         ReleaseBuffer(buffer);
413                 else if (otherBlock != targetBlock)
414                 {
415                         LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
416                         ReleaseBuffer(buffer);
417                 }
418
419                 /* Without FSM, always fall out of the loop and extend */
420                 if (!use_fsm)
421                         break;
422
423                 /*
424                  * Update FSM as to condition of this page, and ask for another page
425                  * to try.
426                  */
427                 targetBlock = RecordAndGetPageWithFreeSpace(relation,
428                                                                                                         targetBlock,
429                                                                                                         pageFreeSpace,
430                                                                                                         len + saveFreeSpace);
431         }
432
433         /*
434          * Have to extend the relation.
435          *
436          * We have to use a lock to ensure no one else is extending the rel at the
437          * same time, else we will both try to initialize the same new page.  We
438          * can skip locking for new or temp relations, however, since no one else
439          * could be accessing them.
440          */
441         needLock = !RELATION_IS_LOCAL(relation);
442
443         if (needLock)
444                 LockRelationForExtension(relation, ExclusiveLock);
445
446         /*
447          * XXX This does an lseek - rather expensive - but at the moment it is the
448          * only way to accurately determine how many blocks are in a relation.  Is
449          * it worth keeping an accurate file length in shared memory someplace,
450          * rather than relying on the kernel to do it for us?
451          */
452         buffer = ReadBufferBI(relation, P_NEW, bistate);
453
454         /*
455          * We can be certain that locking the otherBuffer first is OK, since it
456          * must have a lower page number.
457          */
458         if (otherBuffer != InvalidBuffer)
459                 LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
460
461         /*
462          * Now acquire lock on the new page.
463          */
464         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
465
466         /*
467          * Release the file-extension lock; it's now OK for someone else to extend
468          * the relation some more.  Note that we cannot release this lock before
469          * we have buffer lock on the new page, or we risk a race condition
470          * against vacuumlazy.c --- see comments therein.
471          */
472         if (needLock)
473                 UnlockRelationForExtension(relation, ExclusiveLock);
474
475         /*
476          * We need to initialize the empty new page.  Double-check that it really
477          * is empty (this should never happen, but if it does we don't want to
478          * risk wiping out valid data).
479          */
480         page = BufferGetPage(buffer);
481
482         if (!PageIsNew(page))
483                 elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
484                          BufferGetBlockNumber(buffer),
485                          RelationGetRelationName(relation));
486
487         PageInit(page, BufferGetPageSize(buffer), 0);
488
489         if (len > PageGetHeapFreeSpace(page))
490         {
491                 /* We should not get here given the test at the top */
492                 elog(PANIC, "tuple is too big: size %zu", len);
493         }
494
495         /*
496          * Remember the new page as our target for future insertions.
497          *
498          * XXX should we enter the new page into the free space map immediately,
499          * or just keep it for this backend's exclusive use in the short run
500          * (until VACUUM sees it)?      Seems to depend on whether you expect the
501          * current backend to make more insertions or not, which is probably a
502          * good bet most of the time.  So for now, don't add it to FSM yet.
503          */
504         RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
505
506         return buffer;
507 }