From: Bruce Momjian Date: Tue, 8 Jun 2004 13:59:36 +0000 (+0000) Subject: vacuum.c refactoring X-Git-Tag: REL8_0_0BETA1~415 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5e926cbb6b82d6532ebdc1e4f6856a6a90d4f9fa;p=postgresql vacuum.c refactoring . rename variables . cur_buffer -> dst_buffer . ToPage -> dst_page . cur_page -> dst_vacpage . move variable declarations into block where variable is used . various Asserts instead of elog(ERROR, ...) . extract functionality from repair_frag() into new routines . move_chain_tuple() . move_plain_tuple() . update_hint_bits() . create type ExecContext . add comments Manfred Koizar --- diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 68164a9a80..80a021487f 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.280 2004/06/05 19:48:07 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.281 2004/06/08 13:59:36 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -99,6 +99,64 @@ typedef struct VRelStats VTupleLink vtlinks; } VRelStats; +/*---------------------------------------------------------------------- + * ExecContext: + * + * As these variables always appear together, we put them into one struct + * and pull initialization and cleanup into separate routines. + * ExecContext is used by repair_frag() and move_xxx_tuple(). More + * accurately: It is *used* only in move_xxx_tuple(), but because this + * routine is called many times, we initialize the struct just once in + * repair_frag() and pass it on to move_xxx_tuple(). + */ +typedef struct ExecContextData +{ + ResultRelInfo *resultRelInfo; + EState *estate; + TupleTable tupleTable; + TupleTableSlot *slot; +} ExecContextData; +typedef ExecContextData *ExecContext; + +static void +ExecContext_Init(ExecContext ec, Relation rel) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + + /* + * We need a ResultRelInfo and an EState so we can use the regular + * executor's index-entry-making machinery. + */ + ec->estate = CreateExecutorState(); + + ec->resultRelInfo = makeNode(ResultRelInfo); + ec->resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ + ec->resultRelInfo->ri_RelationDesc = rel; + ec->resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */ + + ExecOpenIndices(ec->resultRelInfo); + + ec->estate->es_result_relations = ec->resultRelInfo; + ec->estate->es_num_result_relations = 1; + ec->estate->es_result_relation_info = ec->resultRelInfo; + + /* Set up a dummy tuple table too */ + ec->tupleTable = ExecCreateTupleTable(1); + ec->slot = ExecAllocTableSlot(ec->tupleTable); + ExecSetSlotDescriptor(ec->slot, tupdesc, false); +} + +static void +ExecContext_Finish(ExecContext ec) +{ + ExecDropTupleTable(ec->tupleTable, true); + ExecCloseIndices(ec->resultRelInfo); + FreeExecutorState(ec->estate); +} +/* + * End of ExecContext Implementation + *---------------------------------------------------------------------- + */ static MemoryContext vac_context = NULL; @@ -122,6 +180,17 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel, static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindexes, Relation *Irel); +static void move_chain_tuple(Relation rel, + Buffer old_buf, Page old_page, HeapTuple old_tup, + Buffer dst_buf, Page dst_page, VacPage dst_vacpage, + ExecContext ec, ItemPointer ctid, bool cleanVpd); +static void move_plain_tuple(Relation rel, + Buffer old_buf, Page old_page, HeapTuple old_tup, + Buffer dst_buf, Page dst_page, VacPage dst_vacpage, + ExecContext ec); +static void update_hint_bits(Relation rel, VacPageList fraged_pages, + int num_fraged_pages, BlockNumber last_move_dest_block, + int num_moved); static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist); static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage); @@ -675,7 +744,7 @@ vac_update_dbstats(Oid dbid, static void vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID) { - TransactionId myXID; + TransactionId myXID = GetCurrentTransactionId(); Relation relation; HeapScanDesc scan; HeapTuple tuple; @@ -683,7 +752,6 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID) bool vacuumAlreadyWrapped = false; bool frozenAlreadyWrapped = false; - myXID = GetCurrentTransactionId(); relation = heap_openr(DatabaseRelationName, AccessShareLock); @@ -1059,17 +1127,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, { BlockNumber nblocks, blkno; - ItemId itemid; - Buffer buf; HeapTupleData tuple; - OffsetNumber offnum, - maxoff; - bool pgchanged, - tupgone, - notup; char *relname; - VacPage vacpage, - vacpagecopy; + VacPage vacpage; BlockNumber empty_pages, empty_end_pages; double num_tuples, @@ -1080,7 +1140,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, usable_free_space; Size min_tlen = MaxTupleSize; Size max_tlen = 0; - int i; bool do_shrinking = true; VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData)); int num_vtlinks = 0; @@ -1113,6 +1172,11 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, tempPage = NULL; bool do_reap, do_frag; + Buffer buf; + OffsetNumber offnum, + maxoff; + bool pgchanged, + notup; vacuum_delay_point(); @@ -1125,6 +1189,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, if (PageIsNew(page)) { + VacPage vacpagecopy; + ereport(WARNING, (errmsg("relation \"%s\" page %u is uninitialized --- fixing", relname, blkno))); @@ -1142,6 +1208,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, if (PageIsEmpty(page)) { + VacPage vacpagecopy; + vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_space += vacpage->free; empty_pages++; @@ -1161,8 +1229,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, offnum = OffsetNumberNext(offnum)) { uint16 sv_infomask; - - itemid = PageGetItemId(page, offnum); + ItemId itemid = PageGetItemId(page, offnum); + bool tupgone = false; /* * Collect un-used items too - it's possible to have indexes @@ -1180,7 +1248,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); - tupgone = false; sv_infomask = tuple.t_data->t_infomask; switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin)) @@ -1269,7 +1336,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, do_shrinking = false; break; default: - elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result"); + /* unexpected HeapTupleSatisfiesVacuum result */ + Assert(false); break; } @@ -1344,7 +1412,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, if (do_reap || do_frag) { - vacpagecopy = copy_vac_page(vacpage); + VacPage vacpagecopy = copy_vac_page(vacpage); if (do_reap) vpage_insert(vacuum_pages, vacpagecopy); if (do_frag) @@ -1390,6 +1458,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, */ if (do_shrinking) { + int i; + Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages); fraged_pages->num_pages -= empty_end_pages; usable_free_space = 0; @@ -1453,76 +1523,29 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindexes, Relation *Irel) { - TransactionId myXID; - CommandId myCID; - Buffer buf, - cur_buffer; + TransactionId myXID = GetCurrentTransactionId(); + Buffer dst_buffer = InvalidBuffer; BlockNumber nblocks, blkno; BlockNumber last_move_dest_block = 0, last_vacuum_block; - Page page, - ToPage = NULL; - OffsetNumber offnum, - maxoff, - newoff, - max_offset; - ItemId itemid, - newitemid; - HeapTupleData tuple, - newtup; - TupleDesc tupdesc; - ResultRelInfo *resultRelInfo; - EState *estate; - TupleTable tupleTable; - TupleTableSlot *slot; + Page dst_page = NULL; + ExecContextData ec; VacPageListData Nvacpagelist; - VacPage cur_page = NULL, + VacPage dst_vacpage = NULL, last_vacuum_page, vacpage, *curpage; - int cur_item = 0; int i; - Size tuple_len; - int num_moved, + int num_moved = 0, num_fraged_pages, vacuumed_pages; - int checked_moved, - num_tuples, - keep_tuples = 0; - bool isempty, - dowrite, - chain_tuple_moved; + int keep_tuples = 0; VacRUsage ru0; vac_init_rusage(&ru0); - myXID = GetCurrentTransactionId(); - myCID = GetCurrentCommandId(); - - tupdesc = RelationGetDescr(onerel); - - /* - * We need a ResultRelInfo and an EState so we can use the regular - * executor's index-entry-making machinery. - */ - estate = CreateExecutorState(); - - resultRelInfo = makeNode(ResultRelInfo); - resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ - resultRelInfo->ri_RelationDesc = onerel; - resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */ - - ExecOpenIndices(resultRelInfo); - - estate->es_result_relations = resultRelInfo; - estate->es_num_result_relations = 1; - estate->es_result_relation_info = resultRelInfo; - - /* Set up a dummy tuple table too */ - tupleTable = ExecCreateTupleTable(1); - slot = ExecAllocTableSlot(tupleTable); - ExecSetSlotDescriptor(slot, tupdesc, false); + ExecContext_Init(&ec, onerel); Nvacpagelist.num_pages = 0; num_fraged_pages = fraged_pages->num_pages; @@ -1539,8 +1562,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, last_vacuum_page = NULL; last_vacuum_block = InvalidBlockNumber; } - cur_buffer = InvalidBuffer; - num_moved = 0; vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber)); vacpage->offsets_used = vacpage->offsets_free = 0; @@ -1560,6 +1581,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, blkno > last_move_dest_block; blkno--) { + Buffer buf; + Page page; + OffsetNumber offnum, + maxoff; + bool isempty, + dowrite, + chain_tuple_moved; + vacuum_delay_point(); /* @@ -1635,7 +1664,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { - itemid = PageGetItemId(page, offnum); + Size tuple_len; + HeapTupleData tuple; + ItemId itemid = PageGetItemId(page, offnum); if (!ItemIdIsUsed(itemid)) continue; @@ -1645,45 +1676,71 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, tuple_len = tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); + /* + * VACUUM FULL has an exclusive lock on the relation. So + * normally no other transaction can have pending INSERTs or + * DELETEs in this relation. A tuple is either + * (a) a tuple in a system catalog, inserted or deleted by + * a not yet committed transaction or + * (b) dead (XMIN_INVALID or XMAX_COMMITTED) or + * (c) inserted by a committed xact (XMIN_COMMITTED) or + * (d) moved by the currently running VACUUM. + * In case (a) we wouldn't be in repair_frag() at all. + * In case (b) we cannot be here, because scan_heap() has + * already marked the item as unused, see continue above. + * Case (c) is what normally is to be expected. + * Case (d) is only possible, if a whole tuple chain has been + * moved while processing this or a higher numbered block. + */ if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { - if (tuple.t_data->t_infomask & HEAP_MOVED_IN) - elog(ERROR, "HEAP_MOVED_IN was not expected"); + /* + * There cannot be another concurrently running VACUUM. If + * the tuple had been moved in by a previous VACUUM, the + * visibility check would have set XMIN_COMMITTED. If the + * tuple had been moved in by the currently running VACUUM, + * the loop would have been terminated. We had + * elog(ERROR, ...) here, but as we are testing for a + * can't-happen condition, Assert() seems more appropriate. + */ + Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN)); /* * If this (chain) tuple is moved by me already then I * have to check is it in vacpage or not - i.e. is it * moved while cleaning this page or some previous one. */ - if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) - { - if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID) - elog(ERROR, "invalid XVAC in tuple header"); - if (keep_tuples == 0) - continue; - if (chain_tuple_moved) /* some chains was moved - * while */ - { /* cleaning this page */ - Assert(vacpage->offsets_free > 0); - for (i = 0; i < vacpage->offsets_free; i++) - { - if (vacpage->offsets[i] == offnum) - break; - } - if (i >= vacpage->offsets_free) /* not found */ - { - vacpage->offsets[vacpage->offsets_free++] = offnum; - keep_tuples--; - } + Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF); + /* + * MOVED_OFF by another VACUUM would have caused the + * visibility check to set XMIN_COMMITTED or XMIN_INVALID. + */ + Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID); + + /* Can't we Assert(keep_tuples > 0) here? */ + if (keep_tuples == 0) + continue; + if (chain_tuple_moved) /* some chains was moved + * while */ + { /* cleaning this page */ + Assert(vacpage->offsets_free > 0); + for (i = 0; i < vacpage->offsets_free; i++) + { + if (vacpage->offsets[i] == offnum) + break; } - else + if (i >= vacpage->offsets_free) /* not found */ { vacpage->offsets[vacpage->offsets_free++] = offnum; keep_tuples--; } - continue; } - elog(ERROR, "HEAP_MOVED_OFF was expected"); + else + { + vacpage->offsets[vacpage->offsets_free++] = offnum; + keep_tuples--; + } + continue; } /* @@ -1716,8 +1773,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, Buffer Cbuf = buf; bool freeCbuf = false; bool chain_move_failed = false; - Page Cpage; - ItemId Citemid; ItemPointerData Ctid; HeapTupleData tp = tuple; Size tlen = tuple_len; @@ -1728,10 +1783,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, int to_item = 0; int ti; - if (cur_buffer != InvalidBuffer) + if (dst_buffer != InvalidBuffer) { - WriteBuffer(cur_buffer); - cur_buffer = InvalidBuffer; + WriteBuffer(dst_buffer); + dst_buffer = InvalidBuffer; } /* Quick exit if we have no vtlinks to search in */ @@ -1754,6 +1809,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid)))) { + Page Cpage; + ItemId Citemid; + ItemPointerData Ctid; + Ctid = tp.t_data->t_ctid; if (freeCbuf) ReleaseBuffer(Cbuf); @@ -1929,12 +1988,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, } /* - * Okay, move the whle tuple chain + * Okay, move the whole tuple chain */ ItemPointerSetInvalid(&Ctid); for (ti = 0; ti < num_vtmove; ti++) { VacPage destvacpage = vtmove[ti].vacpage; + Page Cpage; + ItemId Citemid; /* Get page to move from */ tuple.t_self = vtmove[ti].tid; @@ -1942,13 +2003,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ItemPointerGetBlockNumber(&(tuple.t_self))); /* Get page to move to */ - cur_buffer = ReadBuffer(onerel, destvacpage->blkno); + dst_buffer = ReadBuffer(onerel, destvacpage->blkno); - LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); - if (cur_buffer != Cbuf) + LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE); + if (dst_buffer != Cbuf) LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE); - ToPage = BufferGetPage(cur_buffer); + dst_page = BufferGetPage(dst_buffer); Cpage = BufferGetPage(Cbuf); Citemid = PageGetItemId(Cpage, @@ -1961,120 +2022,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, * make a copy of the source tuple, and then mark the * source tuple MOVED_OFF. */ - heap_copytuple_with_tuple(&tuple, &newtup); - - /* - * register invalidation of source tuple in catcaches. - */ - CacheInvalidateHeapTuple(onerel, &tuple); - - /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */ - START_CRIT_SECTION(); - - tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | - HEAP_XMIN_INVALID | - HEAP_MOVED_IN); - tuple.t_data->t_infomask |= HEAP_MOVED_OFF; - HeapTupleHeaderSetXvac(tuple.t_data, myXID); - - /* - * If this page was not used before - clean it. - * - * NOTE: a nasty bug used to lurk here. It is possible - * for the source and destination pages to be the same - * (since this tuple-chain member can be on a page - * lower than the one we're currently processing in - * the outer loop). If that's true, then after - * vacuum_page() the source tuple will have been - * moved, and tuple.t_data will be pointing at - * garbage. Therefore we must do everything that uses - * tuple.t_data BEFORE this step!! - * - * This path is different from the other callers of - * vacuum_page, because we have already incremented - * the vacpage's offsets_used field to account for the - * tuple(s) we expect to move onto the page. Therefore - * vacuum_page's check for offsets_used == 0 is wrong. - * But since that's a good debugging check for all - * other callers, we work around it here rather than - * remove it. - */ - if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd) - { - int sv_offsets_used = destvacpage->offsets_used; - - destvacpage->offsets_used = 0; - vacuum_page(onerel, cur_buffer, destvacpage); - destvacpage->offsets_used = sv_offsets_used; - } - - /* - * Update the state of the copied tuple, and store it - * on the destination page. - */ - newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | - HEAP_XMIN_INVALID | - HEAP_MOVED_OFF); - newtup.t_data->t_infomask |= HEAP_MOVED_IN; - HeapTupleHeaderSetXvac(newtup.t_data, myXID); - newoff = PageAddItem(ToPage, - (Item) newtup.t_data, - tuple_len, - InvalidOffsetNumber, - LP_USED); - if (newoff == InvalidOffsetNumber) - { - elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain", - (unsigned long) tuple_len, destvacpage->blkno); - } - newitemid = PageGetItemId(ToPage, newoff); - pfree(newtup.t_data); - newtup.t_datamcxt = NULL; - newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); - ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff); - - /* XLOG stuff */ - if (!onerel->rd_istemp) - { - XLogRecPtr recptr = - log_heap_move(onerel, Cbuf, tuple.t_self, - cur_buffer, &newtup); - - if (Cbuf != cur_buffer) - { - PageSetLSN(Cpage, recptr); - PageSetSUI(Cpage, ThisStartUpID); - } - PageSetLSN(ToPage, recptr); - PageSetSUI(ToPage, ThisStartUpID); - } - else - { - /* - * No XLOG record, but still need to flag that XID - * exists on disk - */ - MyXactMadeTempRelUpdate = true; - } - - END_CRIT_SECTION(); + move_chain_tuple(onerel, Cbuf, Cpage, &tuple, + dst_buffer, dst_page, destvacpage, + &ec, &Ctid, vtmove[ti].cleanVpd); + num_moved++; if (destvacpage->blkno > last_move_dest_block) last_move_dest_block = destvacpage->blkno; - /* - * Set new tuple's t_ctid pointing to itself for last - * tuple in chain, and to next tuple in chain - * otherwise. - */ - if (!ItemPointerIsValid(&Ctid)) - newtup.t_data->t_ctid = newtup.t_self; - else - newtup.t_data->t_ctid = Ctid; - Ctid = newtup.t_self; - - num_moved++; - /* * Remember that we moved tuple from the current page * (corresponding index tuple will be cleaned). @@ -2085,23 +2040,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, else keep_tuples++; - LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK); - if (cur_buffer != Cbuf) - LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK); - - /* Create index entries for the moved tuple */ - if (resultRelInfo->ri_NumIndices > 0) - { - ExecStoreTuple(&newtup, slot, InvalidBuffer, false); - ExecInsertIndexTuples(slot, &(newtup.t_self), - estate, true); - } - - WriteBuffer(cur_buffer); + WriteBuffer(dst_buffer); WriteBuffer(Cbuf); } /* end of move-the-tuple-chain loop */ - cur_buffer = InvalidBuffer; + dst_buffer = InvalidBuffer; pfree(vtmove); chain_tuple_moved = true; @@ -2110,13 +2053,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, } /* end of is-tuple-in-chain test */ /* try to find new page for this tuple */ - if (cur_buffer == InvalidBuffer || - !enough_space(cur_page, tuple_len)) + if (dst_buffer == InvalidBuffer || + !enough_space(dst_vacpage, tuple_len)) { - if (cur_buffer != InvalidBuffer) + if (dst_buffer != InvalidBuffer) { - WriteBuffer(cur_buffer); - cur_buffer = InvalidBuffer; + WriteBuffer(dst_buffer); + dst_buffer = InvalidBuffer; } for (i = 0; i < num_fraged_pages; i++) { @@ -2125,110 +2068,32 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, } if (i == num_fraged_pages) break; /* can't move item anywhere */ - cur_item = i; - cur_page = fraged_pages->pagedesc[cur_item]; - cur_buffer = ReadBuffer(onerel, cur_page->blkno); - LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); - ToPage = BufferGetPage(cur_buffer); + dst_vacpage = fraged_pages->pagedesc[i]; + dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno); + LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE); + dst_page = BufferGetPage(dst_buffer); /* if this page was not used before - clean it */ - if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0) - vacuum_page(onerel, cur_buffer, cur_page); + if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0) + vacuum_page(onerel, dst_buffer, dst_vacpage); } else - LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - /* copy tuple */ - heap_copytuple_with_tuple(&tuple, &newtup); + move_plain_tuple(onerel, buf, page, &tuple, + dst_buffer, dst_page, dst_vacpage, &ec); - /* - * register invalidation of source tuple in catcaches. - * - * (Note: we do not need to register the copied tuple, because we - * are not changing the tuple contents and so there cannot be - * any need to flush negative catcache entries.) - */ - CacheInvalidateHeapTuple(onerel, &tuple); - - /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */ - START_CRIT_SECTION(); - /* - * Mark new tuple as MOVED_IN by me. - */ - newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | - HEAP_XMIN_INVALID | - HEAP_MOVED_OFF); - newtup.t_data->t_infomask |= HEAP_MOVED_IN; - HeapTupleHeaderSetXvac(newtup.t_data, myXID); - - /* add tuple to the page */ - newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len, - InvalidOffsetNumber, LP_USED); - if (newoff == InvalidOffsetNumber) - { - elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)", - (unsigned long) tuple_len, - cur_page->blkno, (unsigned long) cur_page->free, - cur_page->offsets_used, cur_page->offsets_free); - } - newitemid = PageGetItemId(ToPage, newoff); - pfree(newtup.t_data); - newtup.t_datamcxt = NULL; - newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); - ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff); - newtup.t_self = newtup.t_data->t_ctid; + num_moved++; + if (dst_vacpage->blkno > last_move_dest_block) + last_move_dest_block = dst_vacpage->blkno; /* - * Mark old tuple as MOVED_OFF by me. + * Remember that we moved tuple from the current page + * (corresponding index tuple will be cleaned). */ - tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | - HEAP_XMIN_INVALID | - HEAP_MOVED_IN); - tuple.t_data->t_infomask |= HEAP_MOVED_OFF; - HeapTupleHeaderSetXvac(tuple.t_data, myXID); - - /* XLOG stuff */ - if (!onerel->rd_istemp) - { - XLogRecPtr recptr = - log_heap_move(onerel, buf, tuple.t_self, - cur_buffer, &newtup); - - PageSetLSN(page, recptr); - PageSetSUI(page, ThisStartUpID); - PageSetLSN(ToPage, recptr); - PageSetSUI(ToPage, ThisStartUpID); - } - else - { - /* - * No XLOG record, but still need to flag that XID exists - * on disk - */ - MyXactMadeTempRelUpdate = true; - } - - END_CRIT_SECTION(); - - cur_page->offsets_used++; - num_moved++; - cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower; - if (cur_page->blkno > last_move_dest_block) - last_move_dest_block = cur_page->blkno; - vacpage->offsets[vacpage->offsets_free++] = offnum; - - LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK); - LockBuffer(buf, BUFFER_LOCK_UNLOCK); - - /* insert index' tuples if needed */ - if (resultRelInfo->ri_NumIndices > 0) - { - ExecStoreTuple(&newtup, slot, InvalidBuffer, false); - ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true); - } } /* walk along page */ /* @@ -2249,36 +2114,31 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, off <= maxoff; off = OffsetNumberNext(off)) { - itemid = PageGetItemId(page, off); + ItemId itemid = PageGetItemId(page, off); + HeapTupleHeader htup; + if (!ItemIdIsUsed(itemid)) continue; - tuple.t_datamcxt = NULL; - tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); - if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED) + htup = (HeapTupleHeader) PageGetItem(page, itemid); + if (htup->t_infomask & HEAP_XMIN_COMMITTED) continue; - if (tuple.t_data->t_infomask & HEAP_MOVED_IN) - elog(ERROR, "HEAP_MOVED_IN was not expected"); - if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) + /* + ** See comments in the walk-along-page loop above, why we + ** have Asserts here instead of if (...) elog(ERROR). + */ + Assert(!(htup->t_infomask & HEAP_MOVED_IN)); + Assert(htup->t_infomask & HEAP_MOVED_OFF); + Assert(HeapTupleHeaderGetXvac(htup) == myXID); + if (chain_tuple_moved) { - if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID) - elog(ERROR, "invalid XVAC in tuple header"); - /* some chains was moved while */ - if (chain_tuple_moved) - { /* cleaning this page */ - Assert(vacpage->offsets_free > 0); - for (i = 0; i < vacpage->offsets_free; i++) - { - if (vacpage->offsets[i] == off) - break; - } - if (i >= vacpage->offsets_free) /* not found */ - { - vacpage->offsets[vacpage->offsets_free++] = off; - Assert(keep_tuples > 0); - keep_tuples--; - } + /* some chains was moved while cleaning this page */ + Assert(vacpage->offsets_free > 0); + for (i = 0; i < vacpage->offsets_free; i++) + { + if (vacpage->offsets[i] == off) + break; } - else + if (i >= vacpage->offsets_free) /* not found */ { vacpage->offsets[vacpage->offsets_free++] = off; Assert(keep_tuples > 0); @@ -2286,7 +2146,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, } } else - elog(ERROR, "HEAP_MOVED_OFF was expected"); + { + vacpage->offsets[vacpage->offsets_free++] = off; + Assert(keep_tuples > 0); + keep_tuples--; + } } } @@ -2312,10 +2176,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, blkno++; /* new number of blocks */ - if (cur_buffer != InvalidBuffer) + if (dst_buffer != InvalidBuffer) { Assert(num_moved > 0); - WriteBuffer(cur_buffer); + WriteBuffer(dst_buffer); } if (num_moved > 0) @@ -2348,6 +2212,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, Assert((*curpage)->blkno < blkno); if ((*curpage)->offsets_used == 0) { + Buffer buf; + Page page; + /* this page was not used as a move target, so must clean it */ buf = ReadBuffer(onerel, (*curpage)->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); @@ -2363,62 +2230,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, * Now scan all the pages that we moved tuples onto and update tuple * status bits. This is not really necessary, but will save time for * future transactions examining these tuples. - * - * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from - * pages that were move source pages but not move dest pages. One - * also wonders whether it wouldn't be better to skip this step and - * let the tuple status updates happen someplace that's not holding an - * exclusive lock on the relation. */ - checked_moved = 0; - for (i = 0, curpage = fraged_pages->pagedesc; - i < num_fraged_pages; - i++, curpage++) - { - vacuum_delay_point(); - - Assert((*curpage)->blkno < blkno); - if ((*curpage)->blkno > last_move_dest_block) - break; /* no need to scan any further */ - if ((*curpage)->offsets_used == 0) - continue; /* this page was never used as a move dest */ - buf = ReadBuffer(onerel, (*curpage)->blkno); - LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - page = BufferGetPage(buf); - num_tuples = 0; - max_offset = PageGetMaxOffsetNumber(page); - for (newoff = FirstOffsetNumber; - newoff <= max_offset; - newoff = OffsetNumberNext(newoff)) - { - itemid = PageGetItemId(page, newoff); - if (!ItemIdIsUsed(itemid)) - continue; - tuple.t_datamcxt = NULL; - tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); - if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) - { - if (!(tuple.t_data->t_infomask & HEAP_MOVED)) - elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected"); - if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID) - elog(ERROR, "invalid XVAC in tuple header"); - if (tuple.t_data->t_infomask & HEAP_MOVED_IN) - { - tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; - tuple.t_data->t_infomask &= ~HEAP_MOVED; - num_tuples++; - } - else - tuple.t_data->t_infomask |= HEAP_XMIN_INVALID; - } - } - LockBuffer(buf, BUFFER_LOCK_UNLOCK); - WriteBuffer(buf); - Assert((*curpage)->offsets_used == num_tuples); - checked_moved += num_tuples; - } - Assert(num_moved == checked_moved); - + update_hint_bits(onerel, fraged_pages, num_fraged_pages, + last_move_dest_block, num_moved); + /* * It'd be cleaner to make this report at the bottom of this routine, * but then the rusage would double-count the second pass of index @@ -2455,6 +2270,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, *vpleft = *vpright; *vpright = vpsave; } + /* + * keep_tuples is the number of tuples that have been moved + * off a page during chain moves but not been scanned over + * subsequently. The tuple ids of these tuples are not + * recorded as free offsets for any VacPage, so they will not + * be cleared from the indexes. + */ Assert(keep_tuples >= 0); for (i = 0; i < nindexes; i++) vacuum_index(&Nvacpagelist, Irel[i], @@ -2465,36 +2287,41 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, if (vacpage->blkno == (blkno - 1) && vacpage->offsets_free > 0) { - OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)]; - int uncnt; + Buffer buf; + Page page; + OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)]; + OffsetNumber offnum, + maxoff; + int uncnt; + int num_tuples = 0; buf = ReadBuffer(onerel, vacpage->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); - num_tuples = 0; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { - itemid = PageGetItemId(page, offnum); + ItemId itemid = PageGetItemId(page, offnum); + HeapTupleHeader htup; + if (!ItemIdIsUsed(itemid)) continue; - tuple.t_datamcxt = NULL; - tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); + htup = (HeapTupleHeader) PageGetItem(page, itemid); + if (htup->t_infomask & HEAP_XMIN_COMMITTED) + continue; - if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) - { - if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) - { - if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID) - elog(ERROR, "invalid XVAC in tuple header"); - itemid->lp_flags &= ~LP_USED; - num_tuples++; - } - else - elog(ERROR, "HEAP_MOVED_OFF was expected"); - } + /* + ** See comments in the walk-along-page loop above, why we + ** have Asserts here instead of if (...) elog(ERROR). + */ + Assert(!(htup->t_infomask & HEAP_MOVED_IN)); + Assert(htup->t_infomask & HEAP_MOVED_OFF); + Assert(HeapTupleHeaderGetXvac(htup) == myXID); + + itemid->lp_flags &= ~LP_USED; + num_tuples++; } Assert(vacpage->offsets_free == num_tuples); @@ -2554,11 +2381,337 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, if (vacrelstats->vtlinks != NULL) pfree(vacrelstats->vtlinks); - ExecDropTupleTable(tupleTable, true); + ExecContext_Finish(&ec); +} + +/* + * move_chain_tuple() -- move one tuple that is part of a tuple chain + * + * This routine moves old_tup from old_page to dst_page. + * old_page and dst_page might be the same page. + * On entry old_buf and dst_buf are locked exclusively, both locks (or + * the single lock, if this is a intra-page-move) are released before + * exit. + * + * Yes, a routine with ten parameters is ugly, but it's still better + * than having these 120 lines of code in repair_frag() which is + * already too long and almost unreadable. + */ +static void +move_chain_tuple(Relation rel, + Buffer old_buf, Page old_page, HeapTuple old_tup, + Buffer dst_buf, Page dst_page, VacPage dst_vacpage, + ExecContext ec, ItemPointer ctid, bool cleanVpd) +{ + TransactionId myXID = GetCurrentTransactionId(); + HeapTupleData newtup; + OffsetNumber newoff; + ItemId newitemid; + Size tuple_len = old_tup->t_len; + + heap_copytuple_with_tuple(old_tup, &newtup); + + /* + * register invalidation of source tuple in catcaches. + */ + CacheInvalidateHeapTuple(rel, old_tup); + + /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */ + START_CRIT_SECTION(); + + old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | + HEAP_XMIN_INVALID | + HEAP_MOVED_IN); + old_tup->t_data->t_infomask |= HEAP_MOVED_OFF; + HeapTupleHeaderSetXvac(old_tup->t_data, myXID); + + /* + * If this page was not used before - clean it. + * + * NOTE: a nasty bug used to lurk here. It is possible + * for the source and destination pages to be the same + * (since this tuple-chain member can be on a page + * lower than the one we're currently processing in + * the outer loop). If that's true, then after + * vacuum_page() the source tuple will have been + * moved, and tuple.t_data will be pointing at + * garbage. Therefore we must do everything that uses + * old_tup->t_data BEFORE this step!! + * + * This path is different from the other callers of + * vacuum_page, because we have already incremented + * the vacpage's offsets_used field to account for the + * tuple(s) we expect to move onto the page. Therefore + * vacuum_page's check for offsets_used == 0 is wrong. + * But since that's a good debugging check for all + * other callers, we work around it here rather than + * remove it. + */ + if (!PageIsEmpty(dst_page) && cleanVpd) + { + int sv_offsets_used = dst_vacpage->offsets_used; + + dst_vacpage->offsets_used = 0; + vacuum_page(rel, dst_buf, dst_vacpage); + dst_vacpage->offsets_used = sv_offsets_used; + } + + /* + * Update the state of the copied tuple, and store it + * on the destination page. + */ + newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | + HEAP_XMIN_INVALID | + HEAP_MOVED_OFF); + newtup.t_data->t_infomask |= HEAP_MOVED_IN; + HeapTupleHeaderSetXvac(newtup.t_data, myXID); + newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len, + InvalidOffsetNumber, LP_USED); + if (newoff == InvalidOffsetNumber) + { + elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain", + (unsigned long) tuple_len, dst_vacpage->blkno); + } + newitemid = PageGetItemId(dst_page, newoff); + pfree(newtup.t_data); + newtup.t_datamcxt = NULL; + newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid); + ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff); + + /* XLOG stuff */ + if (!rel->rd_istemp) + { + XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self, + dst_buf, &newtup); - ExecCloseIndices(resultRelInfo); + if (old_buf != dst_buf) + { + PageSetLSN(old_page, recptr); + PageSetSUI(old_page, ThisStartUpID); + } + PageSetLSN(dst_page, recptr); + PageSetSUI(dst_page, ThisStartUpID); + } + else + { + /* + * No XLOG record, but still need to flag that XID + * exists on disk + */ + MyXactMadeTempRelUpdate = true; + } + + END_CRIT_SECTION(); + + /* + * Set new tuple's t_ctid pointing to itself for last + * tuple in chain, and to next tuple in chain + * otherwise. + */ + /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */ + if (!ItemPointerIsValid(ctid)) + newtup.t_data->t_ctid = newtup.t_self; + else + newtup.t_data->t_ctid = *ctid; + *ctid = newtup.t_self; - FreeExecutorState(estate); + LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK); + if (dst_buf != old_buf) + LockBuffer(old_buf, BUFFER_LOCK_UNLOCK); + + /* Create index entries for the moved tuple */ + if (ec->resultRelInfo->ri_NumIndices > 0) + { + ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false); + ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true); + } +} + +/* + * move_plain_tuple() -- move one tuple that is not part of a chain + * + * This routine moves old_tup from old_page to dst_page. + * On entry old_buf and dst_buf are locked exclusively, both locks are + * released before exit. + * + * Yes, a routine with eight parameters is ugly, but it's still better + * than having these 90 lines of code in repair_frag() which is already + * too long and almost unreadable. + */ +static void +move_plain_tuple(Relation rel, + Buffer old_buf, Page old_page, HeapTuple old_tup, + Buffer dst_buf, Page dst_page, VacPage dst_vacpage, + ExecContext ec) +{ + TransactionId myXID = GetCurrentTransactionId(); + HeapTupleData newtup; + OffsetNumber newoff; + ItemId newitemid; + Size tuple_len = old_tup->t_len; + + /* copy tuple */ + heap_copytuple_with_tuple(old_tup, &newtup); + + /* + * register invalidation of source tuple in catcaches. + * + * (Note: we do not need to register the copied tuple, because we + * are not changing the tuple contents and so there cannot be + * any need to flush negative catcache entries.) + */ + CacheInvalidateHeapTuple(rel, old_tup); + + /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */ + START_CRIT_SECTION(); + + /* + * Mark new tuple as MOVED_IN by me. + */ + newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | + HEAP_XMIN_INVALID | + HEAP_MOVED_OFF); + newtup.t_data->t_infomask |= HEAP_MOVED_IN; + HeapTupleHeaderSetXvac(newtup.t_data, myXID); + + /* add tuple to the page */ + newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len, + InvalidOffsetNumber, LP_USED); + if (newoff == InvalidOffsetNumber) + { + elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)", + (unsigned long) tuple_len, + dst_vacpage->blkno, (unsigned long) dst_vacpage->free, + dst_vacpage->offsets_used, dst_vacpage->offsets_free); + } + newitemid = PageGetItemId(dst_page, newoff); + pfree(newtup.t_data); + newtup.t_datamcxt = NULL; + newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid); + ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff); + newtup.t_self = newtup.t_data->t_ctid; + + /* + * Mark old tuple as MOVED_OFF by me. + */ + old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | + HEAP_XMIN_INVALID | + HEAP_MOVED_IN); + old_tup->t_data->t_infomask |= HEAP_MOVED_OFF; + HeapTupleHeaderSetXvac(old_tup->t_data, myXID); + + /* XLOG stuff */ + if (!rel->rd_istemp) + { + XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self, + dst_buf, &newtup); + + PageSetLSN(old_page, recptr); + PageSetSUI(old_page, ThisStartUpID); + PageSetLSN(dst_page, recptr); + PageSetSUI(dst_page, ThisStartUpID); + } + else + { + /* + * No XLOG record, but still need to flag that XID exists + * on disk + */ + MyXactMadeTempRelUpdate = true; + } + + END_CRIT_SECTION(); + + dst_vacpage->free = ((PageHeader) dst_page)->pd_upper - + ((PageHeader) dst_page)->pd_lower; + LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK); + LockBuffer(old_buf, BUFFER_LOCK_UNLOCK); + + dst_vacpage->offsets_used++; + + /* insert index' tuples if needed */ + if (ec->resultRelInfo->ri_NumIndices > 0) + { + ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false); + ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true); + } +} + +/* + * update_hint_bits() -- update hint bits in destination pages + * + * Scan all the pages that we moved tuples onto and update tuple + * status bits. This is not really necessary, but will save time for + * future transactions examining these tuples. + * + * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from + * pages that were move source pages but not move dest pages. One + * also wonders whether it wouldn't be better to skip this step and + * let the tuple status updates happen someplace that's not holding an + * exclusive lock on the relation. + */ +static void +update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages, + BlockNumber last_move_dest_block, int num_moved) +{ + int checked_moved = 0; + int i; + VacPage *curpage; + + for (i = 0, curpage = fraged_pages->pagedesc; + i < num_fraged_pages; + i++, curpage++) + { + Buffer buf; + Page page; + OffsetNumber max_offset; + OffsetNumber off; + int num_tuples = 0; + + vacuum_delay_point(); + + if ((*curpage)->blkno > last_move_dest_block) + break; /* no need to scan any further */ + if ((*curpage)->offsets_used == 0) + continue; /* this page was never used as a move dest */ + buf = ReadBuffer(rel, (*curpage)->blkno); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + page = BufferGetPage(buf); + max_offset = PageGetMaxOffsetNumber(page); + for (off = FirstOffsetNumber; + off <= max_offset; + off = OffsetNumberNext(off)) + { + ItemId itemid = PageGetItemId(page, off); + HeapTupleHeader htup; + + if (!ItemIdIsUsed(itemid)) + continue; + htup = (HeapTupleHeader) PageGetItem(page, itemid); + if (htup->t_infomask & HEAP_XMIN_COMMITTED) + continue; + /* + * See comments in the walk-along-page loop above, why we + * have Asserts here instead of if (...) elog(ERROR). The + * difference here is that we may see MOVED_IN. + */ + Assert(htup->t_infomask & HEAP_MOVED); + Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId()); + if (htup->t_infomask & HEAP_MOVED_IN) + { + htup->t_infomask |= HEAP_XMIN_COMMITTED; + htup->t_infomask &= ~HEAP_MOVED; + num_tuples++; + } + else + htup->t_infomask |= HEAP_XMIN_INVALID; + } + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + WriteBuffer(buf); + Assert((*curpage)->offsets_used == num_tuples); + checked_moved += num_tuples; + } + Assert(num_moved == checked_moved); } /*