+ return result;
+}
+
+/*
+ * _bt_killitems - set LP_DEAD state for items an indexscan caller has
+ * told us were killed
+ *
+ * scan->so contains information about the current page and killed tuples
+ * thereon (generally, this should only be called if so->numKilled > 0).
+ *
+ * The caller must have pin on so->currPos.buf, but may or may not have
+ * read-lock, as indicated by haveLock. Note that we assume read-lock
+ * is sufficient for setting LP_DEAD status (which is only a hint).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete. We cope with cases where items have moved right due to insertions.
+ * If an item has moved off the current page due to a split, we'll fail to
+ * find it and do nothing (this is not an error case --- we assume the item
+ * will eventually get marked in a future indexscan). Note that because we
+ * hold pin on the target page continuously from initially reading the items
+ * until applying this function, VACUUM cannot have deleted any items from
+ * the page, and so there is no need to search left from the recorded offset.
+ * (This observation also guarantees that the item is still the right one
+ * to delete, which might otherwise be questionable since heap TIDs can get
+ * recycled.)
+ */
+void
+_bt_killitems(IndexScanDesc scan, bool haveLock)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ Page page;
+ BTPageOpaque opaque;
+ OffsetNumber minoff;
+ OffsetNumber maxoff;
+ int i;
+ bool killedsomething = false;
+
+ Assert(BufferIsValid(so->currPos.buf));
+
+ if (!haveLock)
+ LockBuffer(so->currPos.buf, BT_READ);
+
+ page = BufferGetPage(so->currPos.buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ minoff = P_FIRSTDATAKEY(opaque);
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ for (i = 0; i < so->numKilled; i++)
+ {
+ int itemIndex = so->killedItems[i];
+ BTScanPosItem *kitem = &so->currPos.items[itemIndex];
+ OffsetNumber offnum = kitem->indexOffset;
+
+ Assert(itemIndex >= so->currPos.firstItem &&
+ itemIndex <= so->currPos.lastItem);
+ if (offnum < minoff)
+ continue; /* pure paranoia */
+ while (offnum <= maxoff)
+ {
+ ItemId iid = PageGetItemId(page, offnum);
+ IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
+
+ if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
+ {
+ /* found the item */
+ ItemIdMarkDead(iid);
+ killedsomething = true;
+ break; /* out of inner search loop */
+ }
+ offnum = OffsetNumberNext(offnum);
+ }
+ }
+
+ /*
+ * Since this can be redone later if needed, it's treated the same as a
+ * commit-hint-bit status update for heap tuples: we mark the buffer dirty
+ * but don't make a WAL log entry.
+ *
+ * Whenever we mark anything LP_DEAD, we also set the page's
+ * BTP_HAS_GARBAGE flag, which is likewise just a hint.
+ */
+ if (killedsomething)
+ {
+ opaque->btpo_flags |= BTP_HAS_GARBAGE;
+ SetBufferCommitInfoNeedsSave(so->currPos.buf);
+ }
+
+ if (!haveLock)
+ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Always reset the scan state, so we don't look for same items on other
+ * pages.
+ */
+ so->numKilled = 0;
+}
+
+
+/*
+ * The following routines manage a shared-memory area in which we track
+ * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
+ * operations. There is a single counter which increments each time we
+ * start a vacuum to assign it a cycle ID. Since multiple vacuums could
+ * be active concurrently, we have to track the cycle ID for each active
+ * vacuum; this requires at most MaxBackends entries (usually far fewer).
+ * We assume at most one vacuum can be active for a given index.
+ *
+ * Access to the shared memory area is controlled by BtreeVacuumLock.
+ * In principle we could use a separate lmgr locktag for each index,
+ * but a single LWLock is much cheaper, and given the short time that
+ * the lock is ever held, the concurrency hit should be minimal.
+ */
+
+typedef struct BTOneVacInfo
+{
+ LockRelId relid; /* global identifier of an index */
+ BTCycleId cycleid; /* cycle ID for its active VACUUM */
+} BTOneVacInfo;
+
+typedef struct BTVacInfo
+{
+ BTCycleId cycle_ctr; /* cycle ID most recently assigned */
+ int num_vacuums; /* number of currently active VACUUMs */
+ int max_vacuums; /* allocated length of vacuums[] array */
+ BTOneVacInfo vacuums[1]; /* VARIABLE LENGTH ARRAY */
+} BTVacInfo;
+
+static BTVacInfo *btvacinfo;
+
+
+/*
+ * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
+ * or zero if there is no active VACUUM
+ *
+ * Note: for correct interlocking, the caller must already hold pin and
+ * exclusive lock on each buffer it will store the cycle ID into. This
+ * ensures that even if a VACUUM starts immediately afterwards, it cannot
+ * process those pages until the page split is complete.
+ */
+BTCycleId
+_bt_vacuum_cycleid(Relation rel)
+{
+ BTCycleId result = 0;
+ int i;
+
+ /* Share lock is enough since this is a read-only operation */
+ LWLockAcquire(BtreeVacuumLock, LW_SHARED);
+
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ BTOneVacInfo *vac = &btvacinfo->vacuums[i];
+
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ {
+ result = vac->cycleid;
+ break;
+ }
+ }
+
+ LWLockRelease(BtreeVacuumLock);
+ return result;
+}
+
+/*
+ * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
+ *
+ * Note: the caller must guarantee that it will eventually call
+ * _bt_end_vacuum, else we'll permanently leak an array slot. To ensure
+ * that this happens even in elog(FATAL) scenarios, the appropriate coding
+ * is not just a PG_TRY, but
+ * PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
+ */
+BTCycleId
+_bt_start_vacuum(Relation rel)
+{
+ BTCycleId result;
+ int i;
+ BTOneVacInfo *vac;
+
+ LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
+
+ /*
+ * Assign the next cycle ID, being careful to avoid zero as well as the
+ * reserved high values.
+ */
+ result = ++(btvacinfo->cycle_ctr);
+ if (result == 0 || result > MAX_BT_CYCLE_ID)
+ result = btvacinfo->cycle_ctr = 1;
+
+ /* Let's just make sure there's no entry already for this index */
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ vac = &btvacinfo->vacuums[i];
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ {
+ /*
+ * Unlike most places in the backend, we have to explicitly
+ * release our LWLock before throwing an error. This is because
+ * we expect _bt_end_vacuum() to be called before transaction
+ * abort cleanup can run to release LWLocks.
+ */
+ LWLockRelease(BtreeVacuumLock);
+ elog(ERROR, "multiple active vacuums for index \"%s\"",
+ RelationGetRelationName(rel));
+ }
+ }
+
+ /* OK, add an entry */
+ if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
+ {
+ LWLockRelease(BtreeVacuumLock);
+ elog(ERROR, "out of btvacinfo slots");
+ }
+ vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
+ vac->relid = rel->rd_lockInfo.lockRelId;
+ vac->cycleid = result;
+ btvacinfo->num_vacuums++;
+
+ LWLockRelease(BtreeVacuumLock);
+ return result;
+}
+
+/*
+ * _bt_end_vacuum --- mark a btree VACUUM operation as done
+ *
+ * Note: this is deliberately coded not to complain if no entry is found;
+ * this allows the caller to put PG_TRY around the start_vacuum operation.
+ */
+void
+_bt_end_vacuum(Relation rel)
+{
+ int i;
+
+ LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
+
+ /* Find the array entry */
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ BTOneVacInfo *vac = &btvacinfo->vacuums[i];
+
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ {
+ /* Remove it by shifting down the last entry */
+ *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
+ btvacinfo->num_vacuums--;
+ break;
+ }
+ }
+
+ LWLockRelease(BtreeVacuumLock);
+}
+
+/*
+ * _bt_end_vacuum wrapped as an on_shmem_exit callback function
+ */
+void
+_bt_end_vacuum_callback(int code, Datum arg)
+{
+ _bt_end_vacuum((Relation) DatumGetPointer(arg));
+}
+
+/*
+ * BTreeShmemSize --- report amount of shared memory space needed
+ */
+Size
+BTreeShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(BTVacInfo, vacuums[0]);
+ size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
+ return size;
+}
+
+/*
+ * BTreeShmemInit --- initialize this module's shared memory
+ */
+void
+BTreeShmemInit(void)
+{
+ bool found;
+
+ btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
+ BTreeShmemSize(),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ /* Initialize shared memory area */
+ Assert(!found);
+
+ /*
+ * It doesn't really matter what the cycle counter starts at, but
+ * having it always start the same doesn't seem good. Seed with
+ * low-order bits of time() instead.
+ */
+ btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
+
+ btvacinfo->num_vacuums = 0;
+ btvacinfo->max_vacuums = MaxBackends;
+ }
+ else
+ Assert(found);
+}
+
+Datum
+btoptions(PG_FUNCTION_ARGS)
+{
+ Datum reloptions = PG_GETARG_DATUM(0);
+ bool validate = PG_GETARG_BOOL(1);
+ bytea *result;
+
+ result = default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
+ if (result)
+ PG_RETURN_BYTEA_P(result);
+ PG_RETURN_NULL();