]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hash.c
Skip ambulkdelete scan if there's nothing to delete and the index is not
[postgresql] / src / backend / access / hash / hash.c
1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *        Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/hash/hash.c,v 1.86 2006/02/11 23:31:33 tgl Exp $
12  *
13  * NOTES
14  *        This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/genam.h"
22 #include "access/hash.h"
23 #include "access/heapam.h"
24 #include "access/xlogutils.h"
25 #include "catalog/index.h"
26 #include "commands/vacuum.h"
27 #include "executor/executor.h"
28 #include "miscadmin.h"
29
30
31 /* Working state for hashbuild and its callback */
32 typedef struct
33 {
34         double          indtuples;
35 } HashBuildState;
36
37 static void hashbuildCallback(Relation index,
38                                   HeapTuple htup,
39                                   Datum *values,
40                                   bool *isnull,
41                                   bool tupleIsAlive,
42                                   void *state);
43
44
45 /*
46  *      hashbuild() -- build a new hash index.
47  */
48 Datum
49 hashbuild(PG_FUNCTION_ARGS)
50 {
51         Relation        heap = (Relation) PG_GETARG_POINTER(0);
52         Relation        index = (Relation) PG_GETARG_POINTER(1);
53         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
54         double          reltuples;
55         HashBuildState buildstate;
56
57         /*
58          * We expect to be called exactly once for any index relation. If that's
59          * not the case, big trouble's what we have.
60          */
61         if (RelationGetNumberOfBlocks(index) != 0)
62                 elog(ERROR, "index \"%s\" already contains data",
63                          RelationGetRelationName(index));
64
65         /* initialize the hash index metadata page */
66         _hash_metapinit(index);
67
68         /* build the index */
69         buildstate.indtuples = 0;
70
71         /* do the heap scan */
72         reltuples = IndexBuildHeapScan(heap, index, indexInfo,
73                                                                    hashbuildCallback, (void *) &buildstate);
74
75         /* since we just counted the # of tuples, may as well update stats */
76         IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
77
78         PG_RETURN_VOID();
79 }
80
81 /*
82  * Per-tuple callback from IndexBuildHeapScan
83  */
84 static void
85 hashbuildCallback(Relation index,
86                                   HeapTuple htup,
87                                   Datum *values,
88                                   bool *isnull,
89                                   bool tupleIsAlive,
90                                   void *state)
91 {
92         HashBuildState *buildstate = (HashBuildState *) state;
93         IndexTuple      itup;
94
95         /* form an index tuple and point it at the heap tuple */
96         itup = index_form_tuple(RelationGetDescr(index), values, isnull);
97         itup->t_tid = htup->t_self;
98
99         /* Hash indexes don't index nulls, see notes in hashinsert */
100         if (IndexTupleHasNulls(itup))
101         {
102                 pfree(itup);
103                 return;
104         }
105
106         _hash_doinsert(index, itup);
107
108         buildstate->indtuples += 1;
109
110         pfree(itup);
111 }
112
113 /*
114  *      hashinsert() -- insert an index tuple into a hash table.
115  *
116  *      Hash on the index tuple's key, find the appropriate location
117  *      for the new tuple, and put it there.
118  */
119 Datum
120 hashinsert(PG_FUNCTION_ARGS)
121 {
122         Relation        rel = (Relation) PG_GETARG_POINTER(0);
123         Datum      *values = (Datum *) PG_GETARG_POINTER(1);
124         bool       *isnull = (bool *) PG_GETARG_POINTER(2);
125         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
126
127 #ifdef NOT_USED
128         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
129         bool            checkUnique = PG_GETARG_BOOL(5);
130 #endif
131         IndexTuple      itup;
132
133         /* generate an index tuple */
134         itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
135         itup->t_tid = *ht_ctid;
136
137         /*
138          * If the single index key is null, we don't insert it into the index.
139          * Hash tables support scans on '='. Relational algebra says that A = B
140          * returns null if either A or B is null.  This means that no
141          * qualification used in an index scan could ever return true on a null
142          * attribute.  It also means that indices can't be used by ISNULL or
143          * NOTNULL scans, but that's an artifact of the strategy map architecture
144          * chosen in 1986, not of the way nulls are handled here.
145          */
146         if (IndexTupleHasNulls(itup))
147         {
148                 pfree(itup);
149                 PG_RETURN_BOOL(false);
150         }
151
152         _hash_doinsert(rel, itup);
153
154         pfree(itup);
155
156         PG_RETURN_BOOL(true);
157 }
158
159
160 /*
161  *      hashgettuple() -- Get the next tuple in the scan.
162  */
163 Datum
164 hashgettuple(PG_FUNCTION_ARGS)
165 {
166         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
167         ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
168         HashScanOpaque so = (HashScanOpaque) scan->opaque;
169         Relation        rel = scan->indexRelation;
170         Page            page;
171         OffsetNumber offnum;
172         bool            res;
173
174         /*
175          * We hold pin but not lock on current buffer while outside the hash AM.
176          * Reacquire the read lock here.
177          */
178         if (BufferIsValid(so->hashso_curbuf))
179                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
180
181         /*
182          * If we've already initialized this scan, we can just advance it in the
183          * appropriate direction.  If we haven't done so yet, we call a routine to
184          * get the first item in the scan.
185          */
186         if (ItemPointerIsValid(&(scan->currentItemData)))
187         {
188                 /*
189                  * Check to see if we should kill the previously-fetched tuple.
190                  */
191                 if (scan->kill_prior_tuple)
192                 {
193                         /*
194                          * Yes, so mark it by setting the LP_DELETE bit in the item flags.
195                          */
196                         offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
197                         page = BufferGetPage(so->hashso_curbuf);
198                         PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
199
200                         /*
201                          * Since this can be redone later if needed, it's treated the same
202                          * as a commit-hint-bit status update for heap tuples: we mark the
203                          * buffer dirty but don't make a WAL log entry.
204                          */
205                         SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
206                 }
207
208                 /*
209                  * Now continue the scan.
210                  */
211                 res = _hash_next(scan, dir);
212         }
213         else
214                 res = _hash_first(scan, dir);
215
216         /*
217          * Skip killed tuples if asked to.
218          */
219         if (scan->ignore_killed_tuples)
220         {
221                 while (res)
222                 {
223                         offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
224                         page = BufferGetPage(so->hashso_curbuf);
225                         if (!ItemIdDeleted(PageGetItemId(page, offnum)))
226                                 break;
227                         res = _hash_next(scan, dir);
228                 }
229         }
230
231         /* Release read lock on current buffer, but keep it pinned */
232         if (BufferIsValid(so->hashso_curbuf))
233                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
234
235         PG_RETURN_BOOL(res);
236 }
237
238
239 /*
240  *      hashgetmulti() -- get multiple tuples at once
241  *
242  * This is a somewhat generic implementation: it avoids lock reacquisition
243  * overhead, but there's no smarts about picking especially good stopping
244  * points such as index page boundaries.
245  */
246 Datum
247 hashgetmulti(PG_FUNCTION_ARGS)
248 {
249         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
250         ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
251         int32           max_tids = PG_GETARG_INT32(2);
252         int32      *returned_tids = (int32 *) PG_GETARG_POINTER(3);
253         HashScanOpaque so = (HashScanOpaque) scan->opaque;
254         Relation        rel = scan->indexRelation;
255         bool            res = true;
256         int32           ntids = 0;
257
258         /*
259          * We hold pin but not lock on current buffer while outside the hash AM.
260          * Reacquire the read lock here.
261          */
262         if (BufferIsValid(so->hashso_curbuf))
263                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
264
265         while (ntids < max_tids)
266         {
267                 /*
268                  * Start scan, or advance to next tuple.
269                  */
270                 if (ItemPointerIsValid(&(scan->currentItemData)))
271                         res = _hash_next(scan, ForwardScanDirection);
272                 else
273                         res = _hash_first(scan, ForwardScanDirection);
274
275                 /*
276                  * Skip killed tuples if asked to.
277                  */
278                 if (scan->ignore_killed_tuples)
279                 {
280                         while (res)
281                         {
282                                 Page            page;
283                                 OffsetNumber offnum;
284
285                                 offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
286                                 page = BufferGetPage(so->hashso_curbuf);
287                                 if (!ItemIdDeleted(PageGetItemId(page, offnum)))
288                                         break;
289                                 res = _hash_next(scan, ForwardScanDirection);
290                         }
291                 }
292
293                 if (!res)
294                         break;
295                 /* Save tuple ID, and continue scanning */
296                 tids[ntids] = scan->xs_ctup.t_self;
297                 ntids++;
298         }
299
300         /* Release read lock on current buffer, but keep it pinned */
301         if (BufferIsValid(so->hashso_curbuf))
302                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
303
304         *returned_tids = ntids;
305         PG_RETURN_BOOL(res);
306 }
307
308
309 /*
310  *      hashbeginscan() -- start a scan on a hash index
311  */
312 Datum
313 hashbeginscan(PG_FUNCTION_ARGS)
314 {
315         Relation        rel = (Relation) PG_GETARG_POINTER(0);
316         int                     keysz = PG_GETARG_INT32(1);
317         ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(2);
318         IndexScanDesc scan;
319         HashScanOpaque so;
320
321         scan = RelationGetIndexScan(rel, keysz, scankey);
322         so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
323         so->hashso_bucket_valid = false;
324         so->hashso_bucket_blkno = 0;
325         so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer;
326         scan->opaque = so;
327
328         /* register scan in case we change pages it's using */
329         _hash_regscan(scan);
330
331         PG_RETURN_POINTER(scan);
332 }
333
334 /*
335  *      hashrescan() -- rescan an index relation
336  */
337 Datum
338 hashrescan(PG_FUNCTION_ARGS)
339 {
340         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
341         ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
342         HashScanOpaque so = (HashScanOpaque) scan->opaque;
343         Relation        rel = scan->indexRelation;
344
345         /* if we are called from beginscan, so is still NULL */
346         if (so)
347         {
348                 /* release any pins we still hold */
349                 if (BufferIsValid(so->hashso_curbuf))
350                         _hash_dropbuf(rel, so->hashso_curbuf);
351                 so->hashso_curbuf = InvalidBuffer;
352
353                 if (BufferIsValid(so->hashso_mrkbuf))
354                         _hash_dropbuf(rel, so->hashso_mrkbuf);
355                 so->hashso_mrkbuf = InvalidBuffer;
356
357                 /* release lock on bucket, too */
358                 if (so->hashso_bucket_blkno)
359                         _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
360                 so->hashso_bucket_blkno = 0;
361         }
362
363         /* set positions invalid (this will cause _hash_first call) */
364         ItemPointerSetInvalid(&(scan->currentItemData));
365         ItemPointerSetInvalid(&(scan->currentMarkData));
366
367         /* Update scan key, if a new one is given */
368         if (scankey && scan->numberOfKeys > 0)
369         {
370                 memmove(scan->keyData,
371                                 scankey,
372                                 scan->numberOfKeys * sizeof(ScanKeyData));
373                 if (so)
374                         so->hashso_bucket_valid = false;
375         }
376
377         PG_RETURN_VOID();
378 }
379
380 /*
381  *      hashendscan() -- close down a scan
382  */
383 Datum
384 hashendscan(PG_FUNCTION_ARGS)
385 {
386         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
387         HashScanOpaque so = (HashScanOpaque) scan->opaque;
388         Relation        rel = scan->indexRelation;
389
390         /* don't need scan registered anymore */
391         _hash_dropscan(scan);
392
393         /* release any pins we still hold */
394         if (BufferIsValid(so->hashso_curbuf))
395                 _hash_dropbuf(rel, so->hashso_curbuf);
396         so->hashso_curbuf = InvalidBuffer;
397
398         if (BufferIsValid(so->hashso_mrkbuf))
399                 _hash_dropbuf(rel, so->hashso_mrkbuf);
400         so->hashso_mrkbuf = InvalidBuffer;
401
402         /* release lock on bucket, too */
403         if (so->hashso_bucket_blkno)
404                 _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
405         so->hashso_bucket_blkno = 0;
406
407         /* be tidy */
408         ItemPointerSetInvalid(&(scan->currentItemData));
409         ItemPointerSetInvalid(&(scan->currentMarkData));
410
411         pfree(so);
412         scan->opaque = NULL;
413
414         PG_RETURN_VOID();
415 }
416
417 /*
418  *      hashmarkpos() -- save current scan position
419  */
420 Datum
421 hashmarkpos(PG_FUNCTION_ARGS)
422 {
423         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
424         HashScanOpaque so = (HashScanOpaque) scan->opaque;
425         Relation        rel = scan->indexRelation;
426
427         /* release pin on old marked data, if any */
428         if (BufferIsValid(so->hashso_mrkbuf))
429                 _hash_dropbuf(rel, so->hashso_mrkbuf);
430         so->hashso_mrkbuf = InvalidBuffer;
431         ItemPointerSetInvalid(&(scan->currentMarkData));
432
433         /* bump pin count on currentItemData and copy to currentMarkData */
434         if (ItemPointerIsValid(&(scan->currentItemData)))
435         {
436                 IncrBufferRefCount(so->hashso_curbuf);
437                 so->hashso_mrkbuf = so->hashso_curbuf;
438                 scan->currentMarkData = scan->currentItemData;
439         }
440
441         PG_RETURN_VOID();
442 }
443
444 /*
445  *      hashrestrpos() -- restore scan to last saved position
446  */
447 Datum
448 hashrestrpos(PG_FUNCTION_ARGS)
449 {
450         IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
451         HashScanOpaque so = (HashScanOpaque) scan->opaque;
452         Relation        rel = scan->indexRelation;
453
454         /* release pin on current data, if any */
455         if (BufferIsValid(so->hashso_curbuf))
456                 _hash_dropbuf(rel, so->hashso_curbuf);
457         so->hashso_curbuf = InvalidBuffer;
458         ItemPointerSetInvalid(&(scan->currentItemData));
459
460         /* bump pin count on currentMarkData and copy to currentItemData */
461         if (ItemPointerIsValid(&(scan->currentMarkData)))
462         {
463                 IncrBufferRefCount(so->hashso_mrkbuf);
464                 so->hashso_curbuf = so->hashso_mrkbuf;
465                 scan->currentItemData = scan->currentMarkData;
466         }
467
468         PG_RETURN_VOID();
469 }
470
471 /*
472  * Bulk deletion of all index entries pointing to a set of heap tuples.
473  * The set of target tuples is specified via a callback routine that tells
474  * whether any given heap tuple (identified by ItemPointer) is being deleted.
475  *
476  * Result: a palloc'd struct containing statistical info for VACUUM displays.
477  */
478 Datum
479 hashbulkdelete(PG_FUNCTION_ARGS)
480 {
481         Relation        rel = (Relation) PG_GETARG_POINTER(0);
482         IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
483         void       *callback_state = (void *) PG_GETARG_POINTER(2);
484         IndexBulkDeleteResult *result;
485         BlockNumber num_pages;
486         double          tuples_removed;
487         double          num_index_tuples;
488         double          orig_ntuples;
489         Bucket          orig_maxbucket;
490         Bucket          cur_maxbucket;
491         Bucket          cur_bucket;
492         Buffer          metabuf;
493         HashMetaPage metap;
494         HashMetaPageData local_metapage;
495
496         tuples_removed = 0;
497         num_index_tuples = 0;
498
499         /*
500          * Read the metapage to fetch original bucket and tuple counts.  Also, we
501          * keep a copy of the last-seen metapage so that we can use its
502          * hashm_spares[] values to compute bucket page addresses.      This is a bit
503          * hokey but perfectly safe, since the interesting entries in the spares
504          * array cannot change under us; and it beats rereading the metapage for
505          * each bucket.
506          */
507         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
508         _hash_checkpage(rel, metabuf, LH_META_PAGE);
509         metap = (HashMetaPage) BufferGetPage(metabuf);
510         orig_maxbucket = metap->hashm_maxbucket;
511         orig_ntuples = metap->hashm_ntuples;
512         memcpy(&local_metapage, metap, sizeof(local_metapage));
513         _hash_relbuf(rel, metabuf);
514
515         /* Scan the buckets that we know exist */
516         cur_bucket = 0;
517         cur_maxbucket = orig_maxbucket;
518
519 loop_top:
520
521         /*
522          * If we don't have anything to delete, skip the scan, and report the
523          * number of tuples shown in the metapage.  (Unlike btree and gist,
524          * we can trust this number even for a partial index.)
525          */
526         if (!callback_state)
527         {
528                 cur_bucket = cur_maxbucket + 1;
529                 num_index_tuples = local_metapage.hashm_ntuples;
530         }
531
532         while (cur_bucket <= cur_maxbucket)
533         {
534                 BlockNumber bucket_blkno;
535                 BlockNumber blkno;
536                 bool            bucket_dirty = false;
537
538                 /* Get address of bucket's start page */
539                 bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
540
541                 /* Exclusive-lock the bucket so we can shrink it */
542                 _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
543
544                 /* Shouldn't have any active scans locally, either */
545                 if (_hash_has_active_scan(rel, cur_bucket))
546                         elog(ERROR, "hash index has active scan during VACUUM");
547
548                 /* Scan each page in bucket */
549                 blkno = bucket_blkno;
550                 while (BlockNumberIsValid(blkno))
551                 {
552                         Buffer          buf;
553                         Page            page;
554                         HashPageOpaque opaque;
555                         OffsetNumber offno;
556                         OffsetNumber maxoffno;
557                         bool            page_dirty = false;
558
559                         vacuum_delay_point();
560
561                         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
562                         _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
563                         page = BufferGetPage(buf);
564                         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
565                         Assert(opaque->hasho_bucket == cur_bucket);
566
567                         /* Scan each tuple in page */
568                         offno = FirstOffsetNumber;
569                         maxoffno = PageGetMaxOffsetNumber(page);
570                         while (offno <= maxoffno)
571                         {
572                                 IndexTuple      itup;
573                                 ItemPointer htup;
574
575                                 itup = (IndexTuple) PageGetItem(page,
576                                                                                                 PageGetItemId(page, offno));
577                                 htup = &(itup->t_tid);
578                                 if (callback(htup, callback_state))
579                                 {
580                                         /* delete the item from the page */
581                                         PageIndexTupleDelete(page, offno);
582                                         bucket_dirty = page_dirty = true;
583
584                                         /* don't increment offno, instead decrement maxoffno */
585                                         maxoffno = OffsetNumberPrev(maxoffno);
586
587                                         tuples_removed += 1;
588                                 }
589                                 else
590                                 {
591                                         offno = OffsetNumberNext(offno);
592
593                                         num_index_tuples += 1;
594                                 }
595                         }
596
597                         /*
598                          * Write page if needed, advance to next page.
599                          */
600                         blkno = opaque->hasho_nextblkno;
601
602                         if (page_dirty)
603                                 _hash_wrtbuf(rel, buf);
604                         else
605                                 _hash_relbuf(rel, buf);
606                 }
607
608                 /* If we deleted anything, try to compact free space */
609                 if (bucket_dirty)
610                         _hash_squeezebucket(rel, cur_bucket, bucket_blkno);
611
612                 /* Release bucket lock */
613                 _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
614
615                 /* Advance to next bucket */
616                 cur_bucket++;
617         }
618
619         /* Write-lock metapage and check for split since we started */
620         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
621         _hash_checkpage(rel, metabuf, LH_META_PAGE);
622         metap = (HashMetaPage) BufferGetPage(metabuf);
623
624         if (cur_maxbucket != metap->hashm_maxbucket)
625         {
626                 /* There's been a split, so process the additional bucket(s) */
627                 cur_maxbucket = metap->hashm_maxbucket;
628                 memcpy(&local_metapage, metap, sizeof(local_metapage));
629                 _hash_relbuf(rel, metabuf);
630                 goto loop_top;
631         }
632
633         /* Okay, we're really done.  Update tuple count in metapage. */
634
635         if (orig_maxbucket == metap->hashm_maxbucket &&
636                 orig_ntuples == metap->hashm_ntuples)
637         {
638                 /*
639                  * No one has split or inserted anything since start of scan, so
640                  * believe our count as gospel.
641                  */
642                 metap->hashm_ntuples = num_index_tuples;
643         }
644         else
645         {
646                 /*
647                  * Otherwise, our count is untrustworthy since we may have
648                  * double-scanned tuples in split buckets.      Proceed by dead-reckoning.
649                  */
650                 if (metap->hashm_ntuples > tuples_removed)
651                         metap->hashm_ntuples -= tuples_removed;
652                 else
653                         metap->hashm_ntuples = 0;
654                 num_index_tuples = metap->hashm_ntuples;
655         }
656
657         _hash_wrtbuf(rel, metabuf);
658
659         /* return statistics */
660         num_pages = RelationGetNumberOfBlocks(rel);
661
662         result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
663         result->num_pages = num_pages;
664         result->num_index_tuples = num_index_tuples;
665         result->tuples_removed = tuples_removed;
666
667         PG_RETURN_POINTER(result);
668 }
669
670
671 void
672 hash_redo(XLogRecPtr lsn, XLogRecord *record)
673 {
674         elog(PANIC, "hash_redo: unimplemented");
675 }
676
677 void
678 hash_desc(char *buf, uint8 xl_info, char *rec)
679 {
680 }