]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/gin/ginutil.c
Report progress of CREATE INDEX operations
[postgresql] / src / backend / access / gin / ginutil.c
index 52bca8cee3c5ab1797262e8f77ad2683b9541ee8..d2360eeafb0ee357473c7c76a4331dc9d4a2a598 100644 (file)
@@ -1,10 +1,10 @@
 /*-------------------------------------------------------------------------
  *
  * ginutil.c
- *       utilities routines for the postgres inverted index access method.
+ *       Utility routines for the Postgres inverted index access method.
  *
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include "postgres.h"
 
-#include "access/genam.h"
-#include "access/gin.h"
+#include "access/gin_private.h"
+#include "access/ginxlog.h"
 #include "access/reloptions.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
-#include "storage/bufmgr.h"
-#include "storage/freespace.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "utils/builtins.h"
+#include "utils/index_selfuncs.h"
+#include "utils/typcache.h"
 
+
+/*
+ * GIN handler function: return IndexAmRoutine with access method parameters
+ * and callbacks.
+ */
+Datum
+ginhandler(PG_FUNCTION_ARGS)
+{
+       IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
+
+       amroutine->amstrategies = 0;
+       amroutine->amsupport = GINNProcs;
+       amroutine->amcanorder = false;
+       amroutine->amcanorderbyop = false;
+       amroutine->amcanbackward = false;
+       amroutine->amcanunique = false;
+       amroutine->amcanmulticol = true;
+       amroutine->amoptionalkey = true;
+       amroutine->amsearcharray = false;
+       amroutine->amsearchnulls = false;
+       amroutine->amstorage = true;
+       amroutine->amclusterable = false;
+       amroutine->ampredlocks = true;
+       amroutine->amcanparallel = false;
+       amroutine->amcaninclude = false;
+       amroutine->amkeytype = InvalidOid;
+
+       amroutine->ambuild = ginbuild;
+       amroutine->ambuildempty = ginbuildempty;
+       amroutine->aminsert = gininsert;
+       amroutine->ambulkdelete = ginbulkdelete;
+       amroutine->amvacuumcleanup = ginvacuumcleanup;
+       amroutine->amcanreturn = NULL;
+       amroutine->amcostestimate = gincostestimate;
+       amroutine->amoptions = ginoptions;
+       amroutine->amproperty = NULL;
+       amroutine->ambuildphasename = NULL;
+       amroutine->amvalidate = ginvalidate;
+       amroutine->ambeginscan = ginbeginscan;
+       amroutine->amrescan = ginrescan;
+       amroutine->amgettuple = NULL;
+       amroutine->amgetbitmap = gingetbitmap;
+       amroutine->amendscan = ginendscan;
+       amroutine->ammarkpos = NULL;
+       amroutine->amrestrpos = NULL;
+       amroutine->amestimateparallelscan = NULL;
+       amroutine->aminitparallelscan = NULL;
+       amroutine->amparallelrescan = NULL;
+
+       PG_RETURN_POINTER(amroutine);
+}
+
+/*
+ * initGinState: fill in an empty GinState struct to describe the index
+ *
+ * Note: assorted subsidiary data is allocated in the CurrentMemoryContext.
+ */
 void
 initGinState(GinState *state, Relation index)
 {
+       TupleDesc       origTupdesc = RelationGetDescr(index);
        int                     i;
 
-       state->origTupdesc = index->rd_att;
+       MemSet(state, 0, sizeof(GinState));
 
-       state->oneCol = (index->rd_att->natts == 1) ? true : false;
+       state->index = index;
+       state->oneCol = (origTupdesc->natts == 1) ? true : false;
+       state->origTupdesc = origTupdesc;
 
-       for (i = 0; i < index->rd_att->natts; i++)
+       for (i = 0; i < origTupdesc->natts; i++)
        {
-               state->tupdesc[i] = CreateTemplateTupleDesc(2, false);
-
-               TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 1, NULL,
-                                                  INT2OID, -1, 0);
-               TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 2, NULL,
-                                                  index->rd_att->attrs[i]->atttypid,
-                                                  index->rd_att->attrs[i]->atttypmod,
-                                                  index->rd_att->attrs[i]->attndims
-                       );
-
-               fmgr_info_copy(&(state->compareFn[i]),
-                                          index_getprocinfo(index, i + 1, GIN_COMPARE_PROC),
-                                          CurrentMemoryContext);
+               Form_pg_attribute attr = TupleDescAttr(origTupdesc, i);
+
+               if (state->oneCol)
+                       state->tupdesc[i] = state->origTupdesc;
+               else
+               {
+                       state->tupdesc[i] = CreateTemplateTupleDesc(2);
+
+                       TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 1, NULL,
+                                                          INT2OID, -1, 0);
+                       TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 2, NULL,
+                                                          attr->atttypid,
+                                                          attr->atttypmod,
+                                                          attr->attndims);
+                       TupleDescInitEntryCollation(state->tupdesc[i], (AttrNumber) 2,
+                                                                               attr->attcollation);
+               }
+
+               /*
+                * If the compare proc isn't specified in the opclass definition, look
+                * up the index key type's default btree comparator.
+                */
+               if (index_getprocid(index, i + 1, GIN_COMPARE_PROC) != InvalidOid)
+               {
+                       fmgr_info_copy(&(state->compareFn[i]),
+                                                  index_getprocinfo(index, i + 1, GIN_COMPARE_PROC),
+                                                  CurrentMemoryContext);
+               }
+               else
+               {
+                       TypeCacheEntry *typentry;
+
+                       typentry = lookup_type_cache(attr->atttypid,
+                                                                                TYPECACHE_CMP_PROC_FINFO);
+                       if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                                                errmsg("could not identify a comparison function for type %s",
+                                                               format_type_be(attr->atttypid))));
+                       fmgr_info_copy(&(state->compareFn[i]),
+                                                  &(typentry->cmp_proc_finfo),
+                                                  CurrentMemoryContext);
+               }
+
+               /* Opclass must always provide extract procs */
                fmgr_info_copy(&(state->extractValueFn[i]),
                                           index_getprocinfo(index, i + 1, GIN_EXTRACTVALUE_PROC),
                                           CurrentMemoryContext);
                fmgr_info_copy(&(state->extractQueryFn[i]),
                                           index_getprocinfo(index, i + 1, GIN_EXTRACTQUERY_PROC),
                                           CurrentMemoryContext);
-               fmgr_info_copy(&(state->consistentFn[i]),
-                                          index_getprocinfo(index, i + 1, GIN_CONSISTENT_PROC),
-                                          CurrentMemoryContext);
+
+               /*
+                * Check opclass capability to do tri-state or binary logic consistent
+                * check.
+                */
+               if (index_getprocid(index, i + 1, GIN_TRICONSISTENT_PROC) != InvalidOid)
+               {
+                       fmgr_info_copy(&(state->triConsistentFn[i]),
+                                                  index_getprocinfo(index, i + 1, GIN_TRICONSISTENT_PROC),
+                                                  CurrentMemoryContext);
+               }
+
+               if (index_getprocid(index, i + 1, GIN_CONSISTENT_PROC) != InvalidOid)
+               {
+                       fmgr_info_copy(&(state->consistentFn[i]),
+                                                  index_getprocinfo(index, i + 1, GIN_CONSISTENT_PROC),
+                                                  CurrentMemoryContext);
+               }
+
+               if (state->consistentFn[i].fn_oid == InvalidOid &&
+                       state->triConsistentFn[i].fn_oid == InvalidOid)
+               {
+                       elog(ERROR, "missing GIN support function (%d or %d) for attribute %d of index \"%s\"",
+                                GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC,
+                                i + 1, RelationGetRelationName(index));
+               }
 
                /*
                 * Check opclass capability to do partial match.
@@ -64,15 +183,31 @@ initGinState(GinState *state, Relation index)
                if (index_getprocid(index, i + 1, GIN_COMPARE_PARTIAL_PROC) != InvalidOid)
                {
                        fmgr_info_copy(&(state->comparePartialFn[i]),
-                                  index_getprocinfo(index, i + 1, GIN_COMPARE_PARTIAL_PROC),
+                                                  index_getprocinfo(index, i + 1, GIN_COMPARE_PARTIAL_PROC),
                                                   CurrentMemoryContext);
-
                        state->canPartialMatch[i] = true;
                }
                else
                {
                        state->canPartialMatch[i] = false;
                }
+
+               /*
+                * If the index column has a specified collation, we should honor that
+                * while doing comparisons.  However, we may have a collatable storage
+                * type for a noncollatable indexed data type (for instance, hstore
+                * uses text index entries).  If there's no index collation then
+                * specify default collation in case the support functions need
+                * collation.  This is harmless if the support functions don't care
+                * about collation, so we just do it unconditionally.  (We could
+                * alternatively call get_typcollation, but that seems like expensive
+                * overkill --- there aren't going to be any cases where a GIN storage
+                * type has a nondefault collation.)
+                */
+               if (OidIsValid(index->rd_indcollation[i]))
+                       state->supportCollation[i] = index->rd_indcollation[i];
+               else
+                       state->supportCollation[i] = DEFAULT_COLLATION_OID;
        }
 }
 
@@ -82,9 +217,14 @@ initGinState(GinState *state, Relation index)
 OffsetNumber
 gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
 {
-       OffsetNumber colN = FirstOffsetNumber;
+       OffsetNumber colN;
 
-       if (!ginstate->oneCol)
+       if (ginstate->oneCol)
+       {
+               /* column number is not stored explicitly */
+               colN = FirstOffsetNumber;
+       }
+       else
        {
                Datum           res;
                bool            isnull;
@@ -105,13 +245,14 @@ gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
 }
 
 /*
- * Extract stored datum from GIN tuple
+ * Extract stored datum (and possible null category) from GIN tuple
  */
 Datum
-gin_index_getattr(GinState *ginstate, IndexTuple tuple)
+gintuple_get_key(GinState *ginstate, IndexTuple tuple,
+                                GinNullCategory *category)
 {
-       bool            isnull;
        Datum           res;
+       bool            isnull;
 
        if (ginstate->oneCol)
        {
@@ -134,7 +275,10 @@ gin_index_getattr(GinState *ginstate, IndexTuple tuple)
                                                        &isnull);
        }
 
-       Assert(!isnull);
+       if (isnull)
+               *category = GinGetNullCategory(tuple, ginstate);
+       else
+               *category = GIN_CAT_NORM_KEY;
 
        return res;
 }
@@ -144,7 +288,6 @@ gin_index_getattr(GinState *ginstate, IndexTuple tuple)
  * The returned buffer is already pinned and exclusive-locked
  * Caller is responsible for initializing the page by calling GinInitBuffer
  */
-
 Buffer
 GinNewBuffer(Relation index)
 {
@@ -167,12 +310,7 @@ GinNewBuffer(Relation index)
                 */
                if (ConditionalLockBuffer(buffer))
                {
-                       Page            page = BufferGetPage(buffer);
-
-                       if (PageIsNew(page))
-                               return buffer;  /* OK to use, if never initialized */
-
-                       if (GinPageIsDeleted(page))
+                       if (GinPageIsRecyclable(BufferGetPage(buffer)))
                                return buffer;  /* OK to use */
 
                        LockBuffer(buffer, GIN_UNLOCK);
@@ -233,115 +371,244 @@ GinInitMetabuffer(Buffer b)
        metadata->nEntryPages = 0;
        metadata->nDataPages = 0;
        metadata->nEntries = 0;
+       metadata->ginVersion = GIN_CURRENT_VERSION;
+
+       /*
+        * Set pd_lower just past the end of the metadata.  This is essential,
+        * because without doing so, metadata will be lost if xlog.c compresses
+        * the page.
+        */
+       ((PageHeader) page)->pd_lower =
+               ((char *) metadata + sizeof(GinMetaPageData)) - (char *) page;
 }
 
+/*
+ * Compare two keys of the same index column
+ */
 int
-compareEntries(GinState *ginstate, OffsetNumber attnum, Datum a, Datum b)
+ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
+                                 Datum a, GinNullCategory categorya,
+                                 Datum b, GinNullCategory categoryb)
 {
-       return DatumGetInt32(
-                                                FunctionCall2(
-                                                                          &ginstate->compareFn[attnum - 1],
-                                                                          a, b
-                                                                          )
-               );
+       /* if not of same null category, sort by that first */
+       if (categorya != categoryb)
+               return (categorya < categoryb) ? -1 : 1;
+
+       /* all null items in same category are equal */
+       if (categorya != GIN_CAT_NORM_KEY)
+               return 0;
+
+       /* both not null, so safe to call the compareFn */
+       return DatumGetInt32(FunctionCall2Coll(&ginstate->compareFn[attnum - 1],
+                                                                                  ginstate->supportCollation[attnum - 1],
+                                                                                  a, b));
 }
 
+/*
+ * Compare two keys of possibly different index columns
+ */
 int
-compareAttEntries(GinState *ginstate, OffsetNumber attnum_a, Datum a,
-                                 OffsetNumber attnum_b, Datum b)
+ginCompareAttEntries(GinState *ginstate,
+                                        OffsetNumber attnuma, Datum a, GinNullCategory categorya,
+                                        OffsetNumber attnumb, Datum b, GinNullCategory categoryb)
 {
-       if (attnum_a == attnum_b)
-               return compareEntries(ginstate, attnum_a, a, b);
+       /* attribute number is the first sort key */
+       if (attnuma != attnumb)
+               return (attnuma < attnumb) ? -1 : 1;
 
-       return (attnum_a < attnum_b) ? -1 : 1;
+       return ginCompareEntries(ginstate, attnuma, a, categorya, b, categoryb);
 }
 
+
+/*
+ * Support for sorting key datums in ginExtractEntries
+ *
+ * Note: we only have to worry about null and not-null keys here;
+ * ginExtractEntries never generates more than one placeholder null,
+ * so it doesn't have to sort those.
+ */
+typedef struct
+{
+       Datum           datum;
+       bool            isnull;
+} keyEntryData;
+
 typedef struct
 {
        FmgrInfo   *cmpDatumFunc;
-       bool       *needUnique;
-} cmpEntriesData;
+       Oid                     collation;
+       bool            haveDups;
+} cmpEntriesArg;
 
 static int
-cmpEntries(const Datum *a, const Datum *b, cmpEntriesData *arg)
+cmpEntries(const void *a, const void *b, void *arg)
 {
-       int                     res = DatumGetInt32(FunctionCall2(arg->cmpDatumFunc,
-                                                                                                 *a, *b));
+       const keyEntryData *aa = (const keyEntryData *) a;
+       const keyEntryData *bb = (const keyEntryData *) b;
+       cmpEntriesArg *data = (cmpEntriesArg *) arg;
+       int                     res;
 
+       if (aa->isnull)
+       {
+               if (bb->isnull)
+                       res = 0;                        /* NULL "=" NULL */
+               else
+                       res = 1;                        /* NULL ">" not-NULL */
+       }
+       else if (bb->isnull)
+               res = -1;                               /* not-NULL "<" NULL */
+       else
+               res = DatumGetInt32(FunctionCall2Coll(data->cmpDatumFunc,
+                                                                                         data->collation,
+                                                                                         aa->datum, bb->datum));
+
+       /*
+        * Detect if we have any duplicates.  If there are equal keys, qsort must
+        * compare them at some point, else it wouldn't know whether one should go
+        * before or after the other.
+        */
        if (res == 0)
-               *(arg->needUnique) = TRUE;
+               data->haveDups = true;
 
        return res;
 }
 
+
+/*
+ * Extract the index key values from an indexable item
+ *
+ * The resulting key values are sorted, and any duplicates are removed.
+ * This avoids generating redundant index entries.
+ */
 Datum *
-extractEntriesS(GinState *ginstate, OffsetNumber attnum, Datum value, int32 *nentries,
-                               bool *needUnique)
+ginExtractEntries(GinState *ginstate, OffsetNumber attnum,
+                                 Datum value, bool isNull,
+                                 int32 *nentries, GinNullCategory **categories)
 {
        Datum      *entries;
-
-       entries = (Datum *) DatumGetPointer(FunctionCall2(
-                                                                          &ginstate->extractValueFn[attnum - 1],
-                                                                                                         value,
-                                                                                                       PointerGetDatum(nentries)
-                                                                                                         ));
-
-       if (entries == NULL)
-               *nentries = 0;
-
-       *needUnique = FALSE;
-       if (*nentries > 1)
+       bool       *nullFlags;
+       int32           i;
+
+       /*
+        * We don't call the extractValueFn on a null item.  Instead generate a
+        * placeholder.
+        */
+       if (isNull)
        {
-               cmpEntriesData arg;
-
-               arg.cmpDatumFunc = &ginstate->compareFn[attnum - 1];
-               arg.needUnique = needUnique;
-               qsort_arg(entries, *nentries, sizeof(Datum),
-                                 (qsort_arg_comparator) cmpEntries, (void *) &arg);
+               *nentries = 1;
+               entries = (Datum *) palloc(sizeof(Datum));
+               entries[0] = (Datum) 0;
+               *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
+               (*categories)[0] = GIN_CAT_NULL_ITEM;
+               return entries;
        }
 
-       return entries;
-}
-
-
-Datum *
-extractEntriesSU(GinState *ginstate, OffsetNumber attnum, Datum value, int32 *nentries)
-{
-       bool            needUnique;
-       Datum      *entries = extractEntriesS(ginstate, attnum, value, nentries,
-                                                                                 &needUnique);
+       /* OK, call the opclass's extractValueFn */
+       nullFlags = NULL;                       /* in case extractValue doesn't set it */
+       entries = (Datum *)
+               DatumGetPointer(FunctionCall3Coll(&ginstate->extractValueFn[attnum - 1],
+                                                                                 ginstate->supportCollation[attnum - 1],
+                                                                                 value,
+                                                                                 PointerGetDatum(nentries),
+                                                                                 PointerGetDatum(&nullFlags)));
+
+       /*
+        * Generate a placeholder if the item contained no keys.
+        */
+       if (entries == NULL || *nentries <= 0)
+       {
+               *nentries = 1;
+               entries = (Datum *) palloc(sizeof(Datum));
+               entries[0] = (Datum) 0;
+               *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
+               (*categories)[0] = GIN_CAT_EMPTY_ITEM;
+               return entries;
+       }
 
-       if (needUnique)
+       /*
+        * If the extractValueFn didn't create a nullFlags array, create one,
+        * assuming that everything's non-null.
+        */
+       if (nullFlags == NULL)
+               nullFlags = (bool *) palloc0(*nentries * sizeof(bool));
+
+       /*
+        * If there's more than one key, sort and unique-ify.
+        *
+        * XXX Using qsort here is notationally painful, and the overhead is
+        * pretty bad too.  For small numbers of keys it'd likely be better to use
+        * a simple insertion sort.
+        */
+       if (*nentries > 1)
        {
-               Datum      *ptr,
-                                  *res;
+               keyEntryData *keydata;
+               cmpEntriesArg arg;
 
-               ptr = res = entries;
+               keydata = (keyEntryData *) palloc(*nentries * sizeof(keyEntryData));
+               for (i = 0; i < *nentries; i++)
+               {
+                       keydata[i].datum = entries[i];
+                       keydata[i].isnull = nullFlags[i];
+               }
+
+               arg.cmpDatumFunc = &ginstate->compareFn[attnum - 1];
+               arg.collation = ginstate->supportCollation[attnum - 1];
+               arg.haveDups = false;
+               qsort_arg(keydata, *nentries, sizeof(keyEntryData),
+                                 cmpEntries, (void *) &arg);
 
-               while (ptr - entries < *nentries)
+               if (arg.haveDups)
+               {
+                       /* there are duplicates, must get rid of 'em */
+                       int32           j;
+
+                       entries[0] = keydata[0].datum;
+                       nullFlags[0] = keydata[0].isnull;
+                       j = 1;
+                       for (i = 1; i < *nentries; i++)
+                       {
+                               if (cmpEntries(&keydata[i - 1], &keydata[i], &arg) != 0)
+                               {
+                                       entries[j] = keydata[i].datum;
+                                       nullFlags[j] = keydata[i].isnull;
+                                       j++;
+                               }
+                       }
+                       *nentries = j;
+               }
+               else
                {
-                       if (compareEntries(ginstate, attnum, *ptr, *res) != 0)
-                               *(++res) = *ptr++;
-                       else
-                               ptr++;
+                       /* easy, no duplicates */
+                       for (i = 0; i < *nentries; i++)
+                       {
+                               entries[i] = keydata[i].datum;
+                               nullFlags[i] = keydata[i].isnull;
+                       }
                }
 
-               *nentries = res + 1 - entries;
+               pfree(keydata);
        }
 
+       /*
+        * Create GinNullCategory representation from nullFlags.
+        */
+       *categories = (GinNullCategory *) palloc0(*nentries * sizeof(GinNullCategory));
+       for (i = 0; i < *nentries; i++)
+               (*categories)[i] = (nullFlags[i] ? GIN_CAT_NULL_KEY : GIN_CAT_NORM_KEY);
+
        return entries;
 }
 
-Datum
-ginoptions(PG_FUNCTION_ARGS)
+bytea *
+ginoptions(Datum reloptions, bool validate)
 {
-       Datum           reloptions = PG_GETARG_DATUM(0);
-       bool            validate = PG_GETARG_BOOL(1);
        relopt_value *options;
        GinOptions *rdopts;
        int                     numoptions;
        static const relopt_parse_elt tab[] = {
-               {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)}
+               {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)},
+               {"gin_pending_list_limit", RELOPT_TYPE_INT, offsetof(GinOptions,
+                                                                                                                        pendingListCleanupSize)}
        };
 
        options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN,
@@ -349,7 +616,7 @@ ginoptions(PG_FUNCTION_ARGS)
 
        /* if none set, we're done */
        if (numoptions == 0)
-               PG_RETURN_NULL();
+               return NULL;
 
        rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
 
@@ -358,21 +625,21 @@ ginoptions(PG_FUNCTION_ARGS)
 
        pfree(options);
 
-       PG_RETURN_BYTEA_P(rdopts);
+       return (bytea *) rdopts;
 }
 
 /*
  * Fetch index's statistical data into *stats
  *
  * Note: in the result, nPendingPages can be trusted to be up-to-date,
- * but the other fields are as of the last VACUUM.
+ * as can ginVersion; but the other fields are as of the last VACUUM.
  */
 void
 ginGetStats(Relation index, GinStatsData *stats)
 {
-       Buffer                  metabuffer;
-       Page                    metapage;
-       GinMetaPageData *metadata;
+       Buffer          metabuffer;
+       Page            metapage;
+       GinMetaPageData *metadata;
 
        metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
        LockBuffer(metabuffer, GIN_SHARE);
@@ -384,6 +651,7 @@ ginGetStats(Relation index, GinStatsData *stats)
        stats->nEntryPages = metadata->nEntryPages;
        stats->nDataPages = metadata->nDataPages;
        stats->nEntries = metadata->nEntries;
+       stats->ginVersion = metadata->ginVersion;
 
        UnlockReleaseBuffer(metabuffer);
 }
@@ -391,14 +659,14 @@ ginGetStats(Relation index, GinStatsData *stats)
 /*
  * Write the given statistics to the index's metapage
  *
- * Note: nPendingPages is *not* copied over
+ * Note: nPendingPages and ginVersion are *not* copied over
  */
 void
 ginUpdateStats(Relation index, const GinStatsData *stats)
 {
-       Buffer                  metabuffer;
-       Page                    metapage;
-       GinMetaPageData *metadata;
+       Buffer          metabuffer;
+       Page            metapage;
+       GinMetaPageData *metadata;
 
        metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
        LockBuffer(metabuffer, GIN_EXCLUSIVE);
@@ -412,27 +680,34 @@ ginUpdateStats(Relation index, const GinStatsData *stats)
        metadata->nDataPages = stats->nDataPages;
        metadata->nEntries = stats->nEntries;
 
+       /*
+        * Set pd_lower just past the end of the metadata.  This is essential,
+        * because without doing so, metadata will be lost if xlog.c compresses
+        * the page.  (We must do this here because pre-v11 versions of PG did not
+        * set the metapage's pd_lower correctly, so a pg_upgraded index might
+        * contain the wrong value.)
+        */
+       ((PageHeader) metapage)->pd_lower =
+               ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
+
        MarkBufferDirty(metabuffer);
 
-       if (!index->rd_istemp)
+       if (RelationNeedsWAL(index))
        {
-               XLogRecPtr                      recptr;
-               ginxlogUpdateMeta       data;
-               XLogRecData                     rdata;
+               XLogRecPtr      recptr;
+               ginxlogUpdateMeta data;
 
                data.node = index->rd_node;
                data.ntuples = 0;
                data.newRightlink = data.prevTail = InvalidBlockNumber;
                memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
 
-               rdata.buffer = InvalidBuffer;
-               rdata.data = (char *) &data;
-               rdata.len = sizeof(ginxlogUpdateMeta);
-               rdata.next = NULL;
+               XLogBeginInsert();
+               XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
+               XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
 
-               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, &rdata);
+               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
                PageSetLSN(metapage, recptr);
-               PageSetTLI(metapage, ThisTimeLineID);
        }
 
        UnlockReleaseBuffer(metabuffer);