]> granicus.if.org Git - postgresql/blob - contrib/pg_visibility/pg_visibility.c
Post-PG 10 beta1 pgindent run
[postgresql] / contrib / pg_visibility / pg_visibility.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_visibility.c
4  *        display visibility map information and page-level visibility bits
5  *
6  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
7  *
8  *        contrib/pg_visibility/pg_visibility.c
9  *-------------------------------------------------------------------------
10  */
11 #include "postgres.h"
12
13 #include "access/htup_details.h"
14 #include "access/visibilitymap.h"
15 #include "catalog/pg_type.h"
16 #include "catalog/storage_xlog.h"
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/procarray.h"
21 #include "storage/smgr.h"
22 #include "utils/rel.h"
23
24 PG_MODULE_MAGIC;
25
26 typedef struct vbits
27 {
28         BlockNumber next;
29         BlockNumber count;
30         uint8           bits[FLEXIBLE_ARRAY_MEMBER];
31 } vbits;
32
33 typedef struct corrupt_items
34 {
35         BlockNumber next;
36         BlockNumber count;
37         ItemPointer tids;
38 } corrupt_items;
39
40 PG_FUNCTION_INFO_V1(pg_visibility_map);
41 PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
42 PG_FUNCTION_INFO_V1(pg_visibility);
43 PG_FUNCTION_INFO_V1(pg_visibility_rel);
44 PG_FUNCTION_INFO_V1(pg_visibility_map_summary);
45 PG_FUNCTION_INFO_V1(pg_check_frozen);
46 PG_FUNCTION_INFO_V1(pg_check_visible);
47 PG_FUNCTION_INFO_V1(pg_truncate_visibility_map);
48
49 static TupleDesc pg_visibility_tupdesc(bool include_blkno, bool include_pd);
50 static vbits *collect_visibility_data(Oid relid, bool include_pd);
51 static corrupt_items *collect_corrupt_items(Oid relid, bool all_visible,
52                                           bool all_frozen);
53 static void record_corrupt_item(corrupt_items *items, ItemPointer tid);
54 static bool tuple_all_visible(HeapTuple tup, TransactionId OldestXmin,
55                                   Buffer buffer);
56 static void check_relation_relkind(Relation rel);
57
58 /*
59  * Visibility map information for a single block of a relation.
60  *
61  * Note: the VM code will silently return zeroes for pages past the end
62  * of the map, so we allow probes up to MaxBlockNumber regardless of the
63  * actual relation size.
64  */
65 Datum
66 pg_visibility_map(PG_FUNCTION_ARGS)
67 {
68         Oid                     relid = PG_GETARG_OID(0);
69         int64           blkno = PG_GETARG_INT64(1);
70         int32           mapbits;
71         Relation        rel;
72         Buffer          vmbuffer = InvalidBuffer;
73         TupleDesc       tupdesc;
74         Datum           values[2];
75         bool            nulls[2];
76
77         rel = relation_open(relid, AccessShareLock);
78
79         /* Only some relkinds have a visibility map */
80         check_relation_relkind(rel);
81
82         if (blkno < 0 || blkno > MaxBlockNumber)
83                 ereport(ERROR,
84                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85                                  errmsg("invalid block number")));
86
87         tupdesc = pg_visibility_tupdesc(false, false);
88         MemSet(nulls, 0, sizeof(nulls));
89
90         mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
91         if (vmbuffer != InvalidBuffer)
92                 ReleaseBuffer(vmbuffer);
93         values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
94         values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
95
96         relation_close(rel, AccessShareLock);
97
98         PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
99 }
100
101 /*
102  * Visibility map information for a single block of a relation, plus the
103  * page-level information for the same block.
104  */
105 Datum
106 pg_visibility(PG_FUNCTION_ARGS)
107 {
108         Oid                     relid = PG_GETARG_OID(0);
109         int64           blkno = PG_GETARG_INT64(1);
110         int32           mapbits;
111         Relation        rel;
112         Buffer          vmbuffer = InvalidBuffer;
113         Buffer          buffer;
114         Page            page;
115         TupleDesc       tupdesc;
116         Datum           values[3];
117         bool            nulls[3];
118
119         rel = relation_open(relid, AccessShareLock);
120
121         /* Only some relkinds have a visibility map */
122         check_relation_relkind(rel);
123
124         if (blkno < 0 || blkno > MaxBlockNumber)
125                 ereport(ERROR,
126                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127                                  errmsg("invalid block number")));
128
129         tupdesc = pg_visibility_tupdesc(false, true);
130         MemSet(nulls, 0, sizeof(nulls));
131
132         mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
133         if (vmbuffer != InvalidBuffer)
134                 ReleaseBuffer(vmbuffer);
135         values[0] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0);
136         values[1] = BoolGetDatum((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0);
137
138         /* Here we have to explicitly check rel size ... */
139         if (blkno < RelationGetNumberOfBlocks(rel))
140         {
141                 buffer = ReadBuffer(rel, blkno);
142                 LockBuffer(buffer, BUFFER_LOCK_SHARE);
143
144                 page = BufferGetPage(buffer);
145                 values[2] = BoolGetDatum(PageIsAllVisible(page));
146
147                 UnlockReleaseBuffer(buffer);
148         }
149         else
150         {
151                 /* As with the vismap, silently return 0 for pages past EOF */
152                 values[2] = BoolGetDatum(false);
153         }
154
155         relation_close(rel, AccessShareLock);
156
157         PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
158 }
159
160 /*
161  * Visibility map information for every block in a relation.
162  */
163 Datum
164 pg_visibility_map_rel(PG_FUNCTION_ARGS)
165 {
166         FuncCallContext *funcctx;
167         vbits      *info;
168
169         if (SRF_IS_FIRSTCALL())
170         {
171                 Oid                     relid = PG_GETARG_OID(0);
172                 MemoryContext oldcontext;
173
174                 funcctx = SRF_FIRSTCALL_INIT();
175                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
176                 funcctx->tuple_desc = pg_visibility_tupdesc(true, false);
177                 /* collect_visibility_data will verify the relkind */
178                 funcctx->user_fctx = collect_visibility_data(relid, false);
179                 MemoryContextSwitchTo(oldcontext);
180         }
181
182         funcctx = SRF_PERCALL_SETUP();
183         info = (vbits *) funcctx->user_fctx;
184
185         if (info->next < info->count)
186         {
187                 Datum           values[3];
188                 bool            nulls[3];
189                 HeapTuple       tuple;
190
191                 MemSet(nulls, 0, sizeof(nulls));
192                 values[0] = Int64GetDatum(info->next);
193                 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
194                 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
195                 info->next++;
196
197                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
198                 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
199         }
200
201         SRF_RETURN_DONE(funcctx);
202 }
203
204 /*
205  * Visibility map information for every block in a relation, plus the page
206  * level information for each block.
207  */
208 Datum
209 pg_visibility_rel(PG_FUNCTION_ARGS)
210 {
211         FuncCallContext *funcctx;
212         vbits      *info;
213
214         if (SRF_IS_FIRSTCALL())
215         {
216                 Oid                     relid = PG_GETARG_OID(0);
217                 MemoryContext oldcontext;
218
219                 funcctx = SRF_FIRSTCALL_INIT();
220                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
221                 funcctx->tuple_desc = pg_visibility_tupdesc(true, true);
222                 /* collect_visibility_data will verify the relkind */
223                 funcctx->user_fctx = collect_visibility_data(relid, true);
224                 MemoryContextSwitchTo(oldcontext);
225         }
226
227         funcctx = SRF_PERCALL_SETUP();
228         info = (vbits *) funcctx->user_fctx;
229
230         if (info->next < info->count)
231         {
232                 Datum           values[4];
233                 bool            nulls[4];
234                 HeapTuple       tuple;
235
236                 MemSet(nulls, 0, sizeof(nulls));
237                 values[0] = Int64GetDatum(info->next);
238                 values[1] = BoolGetDatum((info->bits[info->next] & (1 << 0)) != 0);
239                 values[2] = BoolGetDatum((info->bits[info->next] & (1 << 1)) != 0);
240                 values[3] = BoolGetDatum((info->bits[info->next] & (1 << 2)) != 0);
241                 info->next++;
242
243                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
244                 SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
245         }
246
247         SRF_RETURN_DONE(funcctx);
248 }
249
250 /*
251  * Count the number of all-visible and all-frozen pages in the visibility
252  * map for a particular relation.
253  */
254 Datum
255 pg_visibility_map_summary(PG_FUNCTION_ARGS)
256 {
257         Oid                     relid = PG_GETARG_OID(0);
258         Relation        rel;
259         BlockNumber nblocks;
260         BlockNumber blkno;
261         Buffer          vmbuffer = InvalidBuffer;
262         int64           all_visible = 0;
263         int64           all_frozen = 0;
264         TupleDesc       tupdesc;
265         Datum           values[2];
266         bool            nulls[2];
267
268         rel = relation_open(relid, AccessShareLock);
269
270         /* Only some relkinds have a visibility map */
271         check_relation_relkind(rel);
272
273         nblocks = RelationGetNumberOfBlocks(rel);
274
275         for (blkno = 0; blkno < nblocks; ++blkno)
276         {
277                 int32           mapbits;
278
279                 /* Make sure we are interruptible. */
280                 CHECK_FOR_INTERRUPTS();
281
282                 /* Get map info. */
283                 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
284                 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
285                         ++all_visible;
286                 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
287                         ++all_frozen;
288         }
289
290         /* Clean up. */
291         if (vmbuffer != InvalidBuffer)
292                 ReleaseBuffer(vmbuffer);
293         relation_close(rel, AccessShareLock);
294
295         tupdesc = CreateTemplateTupleDesc(2, false);
296         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "all_visible", INT8OID, -1, 0);
297         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "all_frozen", INT8OID, -1, 0);
298         tupdesc = BlessTupleDesc(tupdesc);
299
300         MemSet(nulls, 0, sizeof(nulls));
301         values[0] = Int64GetDatum(all_visible);
302         values[1] = Int64GetDatum(all_frozen);
303
304         PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
305 }
306
307 /*
308  * Return the TIDs of non-frozen tuples present in pages marked all-frozen
309  * in the visibility map.  We hope no one will ever find any, but there could
310  * be bugs, database corruption, etc.
311  */
312 Datum
313 pg_check_frozen(PG_FUNCTION_ARGS)
314 {
315         FuncCallContext *funcctx;
316         corrupt_items *items;
317
318         if (SRF_IS_FIRSTCALL())
319         {
320                 Oid                     relid = PG_GETARG_OID(0);
321                 MemoryContext oldcontext;
322
323                 funcctx = SRF_FIRSTCALL_INIT();
324                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
325                 /* collect_corrupt_items will verify the relkind */
326                 funcctx->user_fctx = collect_corrupt_items(relid, false, true);
327                 MemoryContextSwitchTo(oldcontext);
328         }
329
330         funcctx = SRF_PERCALL_SETUP();
331         items = (corrupt_items *) funcctx->user_fctx;
332
333         if (items->next < items->count)
334                 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
335
336         SRF_RETURN_DONE(funcctx);
337 }
338
339 /*
340  * Return the TIDs of not-all-visible tuples in pages marked all-visible
341  * in the visibility map.  We hope no one will ever find any, but there could
342  * be bugs, database corruption, etc.
343  */
344 Datum
345 pg_check_visible(PG_FUNCTION_ARGS)
346 {
347         FuncCallContext *funcctx;
348         corrupt_items *items;
349
350         if (SRF_IS_FIRSTCALL())
351         {
352                 Oid                     relid = PG_GETARG_OID(0);
353                 MemoryContext oldcontext;
354
355                 funcctx = SRF_FIRSTCALL_INIT();
356                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
357                 /* collect_corrupt_items will verify the relkind */
358                 funcctx->user_fctx = collect_corrupt_items(relid, true, false);
359                 MemoryContextSwitchTo(oldcontext);
360         }
361
362         funcctx = SRF_PERCALL_SETUP();
363         items = (corrupt_items *) funcctx->user_fctx;
364
365         if (items->next < items->count)
366                 SRF_RETURN_NEXT(funcctx, PointerGetDatum(&items->tids[items->next++]));
367
368         SRF_RETURN_DONE(funcctx);
369 }
370
371 /*
372  * Remove the visibility map fork for a relation.  If there turn out to be
373  * any bugs in the visibility map code that require rebuilding the VM, this
374  * provides users with a way to do it that is cleaner than shutting down the
375  * server and removing files by hand.
376  *
377  * This is a cut-down version of RelationTruncate.
378  */
379 Datum
380 pg_truncate_visibility_map(PG_FUNCTION_ARGS)
381 {
382         Oid                     relid = PG_GETARG_OID(0);
383         Relation        rel;
384
385         rel = relation_open(relid, AccessExclusiveLock);
386
387         /* Only some relkinds have a visibility map */
388         check_relation_relkind(rel);
389
390         RelationOpenSmgr(rel);
391         rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
392
393         visibilitymap_truncate(rel, 0);
394
395         if (RelationNeedsWAL(rel))
396         {
397                 xl_smgr_truncate xlrec;
398
399                 xlrec.blkno = 0;
400                 xlrec.rnode = rel->rd_node;
401                 xlrec.flags = SMGR_TRUNCATE_VM;
402
403                 XLogBeginInsert();
404                 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
405
406                 XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
407         }
408
409         /*
410          * Release the lock right away, not at commit time.
411          *
412          * It would be a problem to release the lock prior to commit if this
413          * truncate operation sends any transactional invalidation messages. Other
414          * backends would potentially be able to lock the relation without
415          * processing them in the window of time between when we release the lock
416          * here and when we sent the messages at our eventual commit.  However,
417          * we're currently only sending a non-transactional smgr invalidation,
418          * which will have been posted to shared memory immediately from within
419          * visibilitymap_truncate.  Therefore, there should be no race here.
420          *
421          * The reason why it's desirable to release the lock early here is because
422          * of the possibility that someone will need to use this to blow away many
423          * visibility map forks at once.  If we can't release the lock until
424          * commit time, the transaction doing this will accumulate
425          * AccessExclusiveLocks on all of those relations at the same time, which
426          * is undesirable. However, if this turns out to be unsafe we may have no
427          * choice...
428          */
429         relation_close(rel, AccessExclusiveLock);
430
431         /* Nothing to return. */
432         PG_RETURN_VOID();
433 }
434
435 /*
436  * Helper function to construct whichever TupleDesc we need for a particular
437  * call.
438  */
439 static TupleDesc
440 pg_visibility_tupdesc(bool include_blkno, bool include_pd)
441 {
442         TupleDesc       tupdesc;
443         AttrNumber      maxattr = 2;
444         AttrNumber      a = 0;
445
446         if (include_blkno)
447                 ++maxattr;
448         if (include_pd)
449                 ++maxattr;
450         tupdesc = CreateTemplateTupleDesc(maxattr, false);
451         if (include_blkno)
452                 TupleDescInitEntry(tupdesc, ++a, "blkno", INT8OID, -1, 0);
453         TupleDescInitEntry(tupdesc, ++a, "all_visible", BOOLOID, -1, 0);
454         TupleDescInitEntry(tupdesc, ++a, "all_frozen", BOOLOID, -1, 0);
455         if (include_pd)
456                 TupleDescInitEntry(tupdesc, ++a, "pd_all_visible", BOOLOID, -1, 0);
457         Assert(a == maxattr);
458
459         return BlessTupleDesc(tupdesc);
460 }
461
462 /*
463  * Collect visibility data about a relation.
464  *
465  * Checks relkind of relid and will throw an error if the relation does not
466  * have a VM.
467  */
468 static vbits *
469 collect_visibility_data(Oid relid, bool include_pd)
470 {
471         Relation        rel;
472         BlockNumber nblocks;
473         vbits      *info;
474         BlockNumber blkno;
475         Buffer          vmbuffer = InvalidBuffer;
476         BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
477
478         rel = relation_open(relid, AccessShareLock);
479
480         /* Only some relkinds have a visibility map */
481         check_relation_relkind(rel);
482
483         nblocks = RelationGetNumberOfBlocks(rel);
484         info = palloc0(offsetof(vbits, bits) +nblocks);
485         info->next = 0;
486         info->count = nblocks;
487
488         for (blkno = 0; blkno < nblocks; ++blkno)
489         {
490                 int32           mapbits;
491
492                 /* Make sure we are interruptible. */
493                 CHECK_FOR_INTERRUPTS();
494
495                 /* Get map info. */
496                 mapbits = (int32) visibilitymap_get_status(rel, blkno, &vmbuffer);
497                 if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
498                         info->bits[blkno] |= (1 << 0);
499                 if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
500                         info->bits[blkno] |= (1 << 1);
501
502                 /*
503                  * Page-level data requires reading every block, so only get it if the
504                  * caller needs it.  Use a buffer access strategy, too, to prevent
505                  * cache-trashing.
506                  */
507                 if (include_pd)
508                 {
509                         Buffer          buffer;
510                         Page            page;
511
512                         buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
513                                                                                 bstrategy);
514                         LockBuffer(buffer, BUFFER_LOCK_SHARE);
515
516                         page = BufferGetPage(buffer);
517                         if (PageIsAllVisible(page))
518                                 info->bits[blkno] |= (1 << 2);
519
520                         UnlockReleaseBuffer(buffer);
521                 }
522         }
523
524         /* Clean up. */
525         if (vmbuffer != InvalidBuffer)
526                 ReleaseBuffer(vmbuffer);
527         relation_close(rel, AccessShareLock);
528
529         return info;
530 }
531
532 /*
533  * Returns a list of items whose visibility map information does not match
534  * the status of the tuples on the page.
535  *
536  * If all_visible is passed as true, this will include all items which are
537  * on pages marked as all-visible in the visibility map but which do not
538  * seem to in fact be all-visible.
539  *
540  * If all_frozen is passed as true, this will include all items which are
541  * on pages marked as all-frozen but which do not seem to in fact be frozen.
542  *
543  * Checks relkind of relid and will throw an error if the relation does not
544  * have a VM.
545  */
546 static corrupt_items *
547 collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
548 {
549         Relation        rel;
550         BlockNumber nblocks;
551         corrupt_items *items;
552         BlockNumber blkno;
553         Buffer          vmbuffer = InvalidBuffer;
554         BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
555         TransactionId OldestXmin = InvalidTransactionId;
556
557         if (all_visible)
558         {
559                 /* Don't pass rel; that will fail in recovery. */
560                 OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
561         }
562
563         rel = relation_open(relid, AccessShareLock);
564
565         /* Only some relkinds have a visibility map */
566         check_relation_relkind(rel);
567
568         nblocks = RelationGetNumberOfBlocks(rel);
569
570         /*
571          * Guess an initial array size. We don't expect many corrupted tuples, so
572          * start with a small array.  This function uses the "next" field to track
573          * the next offset where we can store an item (which is the same thing as
574          * the number of items found so far) and the "count" field to track the
575          * number of entries allocated.  We'll repurpose these fields before
576          * returning.
577          */
578         items = palloc0(sizeof(corrupt_items));
579         items->next = 0;
580         items->count = 64;
581         items->tids = palloc(items->count * sizeof(ItemPointerData));
582
583         /* Loop over every block in the relation. */
584         for (blkno = 0; blkno < nblocks; ++blkno)
585         {
586                 bool            check_frozen = false;
587                 bool            check_visible = false;
588                 Buffer          buffer;
589                 Page            page;
590                 OffsetNumber offnum,
591                                         maxoff;
592
593                 /* Make sure we are interruptible. */
594                 CHECK_FOR_INTERRUPTS();
595
596                 /* Use the visibility map to decide whether to check this page. */
597                 if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
598                         check_frozen = true;
599                 if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
600                         check_visible = true;
601                 if (!check_visible && !check_frozen)
602                         continue;
603
604                 /* Read and lock the page. */
605                 buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
606                                                                         bstrategy);
607                 LockBuffer(buffer, BUFFER_LOCK_SHARE);
608
609                 page = BufferGetPage(buffer);
610                 maxoff = PageGetMaxOffsetNumber(page);
611
612                 /*
613                  * The visibility map bits might have changed while we were acquiring
614                  * the page lock.  Recheck to avoid returning spurious results.
615                  */
616                 if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
617                         check_frozen = false;
618                 if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
619                         check_visible = false;
620                 if (!check_visible && !check_frozen)
621                 {
622                         UnlockReleaseBuffer(buffer);
623                         continue;
624                 }
625
626                 /* Iterate over each tuple on the page. */
627                 for (offnum = FirstOffsetNumber;
628                          offnum <= maxoff;
629                          offnum = OffsetNumberNext(offnum))
630                 {
631                         HeapTupleData tuple;
632                         ItemId          itemid;
633
634                         itemid = PageGetItemId(page, offnum);
635
636                         /* Unused or redirect line pointers are of no interest. */
637                         if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
638                                 continue;
639
640                         /* Dead line pointers are neither all-visible nor frozen. */
641                         if (ItemIdIsDead(itemid))
642                         {
643                                 ItemPointerSet(&(tuple.t_self), blkno, offnum);
644                                 record_corrupt_item(items, &tuple.t_self);
645                                 continue;
646                         }
647
648                         /* Initialize a HeapTupleData structure for checks below. */
649                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
650                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
651                         tuple.t_len = ItemIdGetLength(itemid);
652                         tuple.t_tableOid = relid;
653
654                         /*
655                          * If we're checking whether the page is all-visible, we expect
656                          * the tuple to be all-visible.
657                          */
658                         if (check_visible &&
659                                 !tuple_all_visible(&tuple, OldestXmin, buffer))
660                         {
661                                 TransactionId RecomputedOldestXmin;
662
663                                 /*
664                                  * Time has passed since we computed OldestXmin, so it's
665                                  * possible that this tuple is all-visible in reality even
666                                  * though it doesn't appear so based on our
667                                  * previously-computed value.  Let's compute a new value so we
668                                  * can be certain whether there is a problem.
669                                  *
670                                  * From a concurrency point of view, it sort of sucks to
671                                  * retake ProcArrayLock here while we're holding the buffer
672                                  * exclusively locked, but it should be safe against
673                                  * deadlocks, because surely GetOldestXmin() should never take
674                                  * a buffer lock. And this shouldn't happen often, so it's
675                                  * worth being careful so as to avoid false positives.
676                                  */
677                                 RecomputedOldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
678
679                                 if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
680                                         record_corrupt_item(items, &tuple.t_self);
681                                 else
682                                 {
683                                         OldestXmin = RecomputedOldestXmin;
684                                         if (!tuple_all_visible(&tuple, OldestXmin, buffer))
685                                                 record_corrupt_item(items, &tuple.t_self);
686                                 }
687                         }
688
689                         /*
690                          * If we're checking whether the page is all-frozen, we expect the
691                          * tuple to be in a state where it will never need freezing.
692                          */
693                         if (check_frozen)
694                         {
695                                 if (heap_tuple_needs_eventual_freeze(tuple.t_data))
696                                         record_corrupt_item(items, &tuple.t_self);
697                         }
698                 }
699
700                 UnlockReleaseBuffer(buffer);
701         }
702
703         /* Clean up. */
704         if (vmbuffer != InvalidBuffer)
705                 ReleaseBuffer(vmbuffer);
706         relation_close(rel, AccessShareLock);
707
708         /*
709          * Before returning, repurpose the fields to match caller's expectations.
710          * next is now the next item that should be read (rather than written) and
711          * count is now the number of items we wrote (rather than the number we
712          * allocated).
713          */
714         items->count = items->next;
715         items->next = 0;
716
717         return items;
718 }
719
720 /*
721  * Remember one corrupt item.
722  */
723 static void
724 record_corrupt_item(corrupt_items *items, ItemPointer tid)
725 {
726         /* enlarge output array if needed. */
727         if (items->next >= items->count)
728         {
729                 items->count *= 2;
730                 items->tids = repalloc(items->tids,
731                                                            items->count * sizeof(ItemPointerData));
732         }
733         /* and add the new item */
734         items->tids[items->next++] = *tid;
735 }
736
737 /*
738  * Check whether a tuple is all-visible relative to a given OldestXmin value.
739  * The buffer should contain the tuple and should be locked and pinned.
740  */
741 static bool
742 tuple_all_visible(HeapTuple tup, TransactionId OldestXmin, Buffer buffer)
743 {
744         HTSV_Result state;
745         TransactionId xmin;
746
747         state = HeapTupleSatisfiesVacuum(tup, OldestXmin, buffer);
748         if (state != HEAPTUPLE_LIVE)
749                 return false;                   /* all-visible implies live */
750
751         /*
752          * Neither lazy_scan_heap nor heap_page_is_all_visible will mark a page
753          * all-visible unless every tuple is hinted committed. However, those hint
754          * bits could be lost after a crash, so we can't be certain that they'll
755          * be set here.  So just check the xmin.
756          */
757
758         xmin = HeapTupleHeaderGetXmin(tup->t_data);
759         if (!TransactionIdPrecedes(xmin, OldestXmin))
760                 return false;                   /* xmin not old enough for all to see */
761
762         return true;
763 }
764
765 /*
766  * check_relation_relkind - convenience routine to check that relation
767  * is of the relkind supported by the callers
768  */
769 static void
770 check_relation_relkind(Relation rel)
771 {
772         if (rel->rd_rel->relkind != RELKIND_RELATION &&
773                 rel->rd_rel->relkind != RELKIND_MATVIEW &&
774                 rel->rd_rel->relkind != RELKIND_TOASTVALUE)
775                 ereport(ERROR,
776                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
777                    errmsg("\"%s\" is not a table, materialized view, or TOAST table",
778                                   RelationGetRelationName(rel))));
779 }