]> granicus.if.org Git - postgresql/commitdiff
Teach SPGiST to store nulls and do whole-index scans.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 11 Mar 2012 20:29:04 +0000 (16:29 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 11 Mar 2012 20:29:59 +0000 (16:29 -0400)
This patch fixes the other major compatibility-breaking limitation of
SPGiST, that it didn't store anything for null values of the indexed
column, and so could not support whole-index scans or "x IS NULL"
tests.  The approach is to create a wholly separate search tree for
the null entries, and use fixed "allTheSame" insertion and search
rules when processing this tree, instead of calling the index opclass
methods.  This way the opclass methods do not need to worry about
dealing with nulls.

Catversion bump is for pg_am updates as well as the change in on-disk
format of SPGiST indexes; there are some tweaks in SPGiST WAL records
as well.

Heavily rewritten version of a patch by Oleg Bartunov and Teodor Sigaev.
(The original also stored nulls separately, but it reused GIN code to do
so; which required undesirable compromises in the on-disk format, and
would likely lead to bugs due to the GIN code being required to work in
two very different contexts.)

13 files changed:
doc/src/sgml/spgist.sgml
src/backend/access/spgist/README
src/backend/access/spgist/spgdoinsert.c
src/backend/access/spgist/spginsert.c
src/backend/access/spgist/spgscan.c
src/backend/access/spgist/spgutils.c
src/backend/access/spgist/spgvacuum.c
src/backend/access/spgist/spgxlog.c
src/include/access/spgist_private.h
src/include/catalog/catversion.h
src/include/catalog/pg_am.h
src/test/regress/expected/create_index.out
src/test/regress/sql/create_index.sql

index 0202dbcdd5a260bb66e18c2b7baab85de7bad44c..fd312cf4368ea90e4ec136a66cfe70754de6f1c7 100644 (file)
   value when that is needed.
  </para>
 
+ <note>
+  <para>
+   The <acronym>SP-GiST</acronym> core code takes care of NULL entries.
+   Although <acronym>SP-GiST</acronym> indexes do store entries for nulls
+   in indexed columns, this is hidden from the index operator class code:
+   no null index entries or search conditions will ever be passed to the
+   operator class methods.  (It is assumed that <acronym>SP-GiST</acronym>
+   operators are strict and so cannot succeed for NULL values.)  NULLs
+   are therefore not discussed further here.
+  </para>
+ </note>
+
  <para>
   There are five user-defined methods that an index operator class for
   <acronym>SP-GiST</acronym> must provide.  All five follow the convention
index 4ff0e357cb4238bc9a73137c69606aed869e5b39..d20ad17a4b66980d0cd59afff8c1523e6ddb1366 100644 (file)
@@ -11,6 +11,7 @@ should have a high fanout to minimize I/O.  The challenge is to map tree
 nodes to disk pages in such a way that the search algorithm accesses only a
 few disk pages, even if it traverses many nodes.
 
+
 COMMON STRUCTURE DESCRIPTION
 
 Logically, an SP-GiST tree is a set of tuples, each of which can be either
@@ -71,6 +72,21 @@ Leaf tuple consists of:
 
   ItemPointer to the heap
 
+
+NULLS HANDLING
+
+We assume that SPGiST-indexable operators are strict (can never succeed for
+null inputs).  It is still desirable to index nulls, so that whole-table
+indexscans are possible and so that "x IS NULL" can be implemented by an
+SPGiST indexscan.  However, we prefer that SPGiST index opclasses not have
+to cope with nulls.  Therefore, the main tree of an SPGiST index does not
+include any null entries.  We store null entries in a separate SPGiST tree
+occupying a disjoint set of pages (in particular, its own root page).
+Insertions and searches in the nulls tree do not use any of the
+opclass-supplied functions, but just use hardwired logic comparable to
+AllTheSame cases in the normal tree.
+
+
 INSERTION ALGORITHM
 
 Insertion algorithm is designed to keep the tree in a consistent state at
@@ -181,6 +197,7 @@ described in (5).
 and a new tuple to another page, if the list is short enough. This improves
 space utilization, but doesn't change the basis of the algorithm.
 
+
 CONCURRENCY
 
 While descending the tree, the insertion algorithm holds exclusive lock on
@@ -218,6 +235,7 @@ scan that had already visited the parent level could possibly reach such a
 redirect tuple, so we can remove redirects once all active transactions have
 been flushed out of the system.
 
+
 DEAD TUPLES
 
 Tuples on leaf pages can be in one of four states:
@@ -269,6 +287,7 @@ to PLACEHOLDER status by VACUUM, and are then candidates for replacement.
 DEAD state is not currently possible, since VACUUM does not attempt to
 remove unused inner tuples.
 
+
 VACUUM
 
 VACUUM (or more precisely, spgbulkdelete) performs a single sequential scan
@@ -302,13 +321,16 @@ performed; otherwise, it does an spgbulkdelete scan with an empty target
 list, so as to clean up redirections and placeholders, update the free
 space map, and gather statistics.
 
+
 LAST USED PAGE MANAGEMENT
 
-List of last used pages contains four pages - a leaf page and three inner
-pages, one from each "triple parity" group.  This list is stored between
-calls on the index meta page, but updates are never WAL-logged to decrease
-WAL traffic.  Incorrect data on meta page isn't critical, because we could
-allocate a new page at any moment.
+The list of last used pages contains four pages - a leaf page and three
+inner pages, one from each "triple parity" group.  (Actually, there's one
+such list for the main tree and a separate one for the nulls tree.)  This
+list is stored between calls on the index meta page, but updates are never
+WAL-logged to decrease WAL traffic.  Incorrect data on meta page isn't
+critical, because we could allocate a new page at any moment.
+
 
 AUTHORS
 
index 85704762a6f242d48298f24b2d894f2cc5db9790..5ddb6672c5c8cb3877a47ca9952fae74b38704b7 100644 (file)
@@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent,
  */
 static void
 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
-                        SPPageDesc *current, SPPageDesc *parent, bool isNew)
+                        SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 {
        XLogRecData rdata[4];
        spgxlogAddLeaf xlrec;
@@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        xlrec.node = index->rd_node;
        xlrec.blknoLeaf = current->blkno;
        xlrec.newPage = isNew;
+       xlrec.storesNulls = isNulls;
 
        /* these will be filled below as needed */
        xlrec.offnumLeaf = InvalidOffsetNumber;
@@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        START_CRIT_SECTION();
 
        if (current->offnum == InvalidOffsetNumber ||
-               current->blkno == SPGIST_HEAD_BLKNO)
+               SpGistBlockIsRoot(current->blkno))
        {
                /* Tuple is not part of a chain */
                leafTuple->nextOffset = InvalidOffsetNumber;
@@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state,
                                n = 0,
                                totalSize = 0;
 
-       if (current->blkno == SPGIST_HEAD_BLKNO)
+       if (SpGistBlockIsRoot(current->blkno))
        {
                /* return impossible values to force split */
                *nToSplit = BLCKSZ;
@@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state,
 static void
 moveLeafs(Relation index, SpGistState *state,
                  SPPageDesc *current, SPPageDesc *parent,
-                 SpGistLeafTuple newLeafTuple)
+                 SpGistLeafTuple newLeafTuple, bool isNulls)
 {
        int                     i,
                                nDelete,
@@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state,
        }
 
        /* Find a leaf page that will hold them */
-       nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
+       nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
+                                                  size, &xlrec.newPage);
        npage = BufferGetPage(nbuf);
        nblkno = BufferGetBlockNumber(nbuf);
        Assert(nblkno != current->blkno);
@@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state,
        xlrec.blknoDst = nblkno;
        xlrec.nMoves = nDelete;
        xlrec.replaceDead = replaceDead;
+       xlrec.storesNulls = isNulls;
 
        xlrec.blknoParent = parent->blkno;
        xlrec.offnumParent = parent->offnum;
@@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
  * If so, randomly divide the tuples into several nodes (all with the same
  * label) and return TRUE to select allTheSame mode for this inner tuple.
  *
+ * (This code is also used to forcibly select allTheSame mode for nulls.)
+ *
  * If we know that the leaf tuples wouldn't all fit on one page, then we
  * exclude the last tuple (which is the incoming new tuple that forced a split)
  * from the check to see if more than one node is used.  The reason for this
@@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
 static bool
 doPickSplit(Relation index, SpGistState *state,
                        SPPageDesc *current, SPPageDesc *parent,
-                       SpGistLeafTuple newLeafTuple, int level, bool isNew)
+                       SpGistLeafTuple newLeafTuple,
+                       int level, bool isNulls, bool isNew)
 {
        bool            insertedNew = false;
        spgPickSplitIn in;
@@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state,
         * also, count up the amount of space that will be freed from current.
         * (Note that in the non-root case, we won't actually delete the old
         * tuples, only replace them with redirects or placeholders.)
+        *
+        * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
+        * page.  For a pass-by-value data type we will fetch a word that must
+        * exist even though it may contain garbage (because of the fact that leaf
+        * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
+        * we are just computing a pointer that isn't going to get dereferenced.
+        * So it's not worth guarding the calls with isNulls checks.
         */
        nToInsert = 0;
        nToDelete = 0;
        spaceToDelete = 0;
-       if (current->blkno == SPGIST_HEAD_BLKNO)
+       if (SpGistBlockIsRoot(current->blkno))
        {
                /*
                 * We are splitting the root (which up to now is also a leaf page).
@@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state,
        heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
        in.nTuples++;
 
-       /*
-        * Perform split using user-defined method.
-        */
        memset(&out, 0, sizeof(out));
 
-       procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
-       FunctionCall2Coll(procinfo,
-                                         index->rd_indcollation[0],
-                                         PointerGetDatum(&in),
-                                         PointerGetDatum(&out));
+       if (!isNulls)
+       {
+               /*
+                * Perform split using user-defined method.
+                */
+               procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
+               FunctionCall2Coll(procinfo,
+                                                 index->rd_indcollation[0],
+                                                 PointerGetDatum(&in),
+                                                 PointerGetDatum(&out));
 
-       /*
-        * Form new leaf tuples and count up the total space needed.
-        */
-       totalLeafSizes = 0;
-       for (i = 0; i < in.nTuples; i++)
+               /*
+                * Form new leaf tuples and count up the total space needed.
+                */
+               totalLeafSizes = 0;
+               for (i = 0; i < in.nTuples; i++)
+               {
+                       newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+                                                                                  out.leafTupleDatums[i],
+                                                                                  false);
+                       totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               }
+       }
+       else
        {
-               newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-                                                                          out.leafTupleDatums[i]);
-               totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               /*
+                * Perform dummy split that puts all tuples into one node.
+                * checkAllTheSame will override this and force allTheSame mode.
+                */
+               out.hasPrefix = false;
+               out.nNodes = 1;
+               out.nodeLabels = NULL;
+               out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
+
+               /*
+                * Form new leaf tuples and count up the total space needed.
+                */
+               totalLeafSizes = 0;
+               for (i = 0; i < in.nTuples; i++)
+               {
+                       newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+                                                                                  (Datum) 0,
+                                                                                  true);
+                       totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               }
        }
 
        /*
@@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state,
        for (i = 0; i < out.nNodes; i++)
        {
                Datum           label = (Datum) 0;
-               bool            isnull = (out.nodeLabels == NULL);
+               bool            labelisnull = (out.nodeLabels == NULL);
 
-               if (!isnull)
+               if (!labelisnull)
                        label = out.nodeLabels[i];
-               nodes[i] = spgFormNodeTuple(state, label, isnull);
+               nodes[i] = spgFormNodeTuple(state, label, labelisnull);
        }
        innerTuple = spgFormInnerTuple(state,
                                                                   out.hasPrefix, out.prefixDatum,
@@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state,
         */
        xlrec.initInner = false;
        if (parent->buffer != InvalidBuffer &&
-               parent->blkno != SPGIST_HEAD_BLKNO &&
+               !SpGistBlockIsRoot(parent->blkno) &&
                (SpGistPageGetFreeSpace(parent->page, 1) >=
                 innerTuple->size + sizeof(ItemIdData)))
        {
@@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state,
        {
                /* Send tuple to page with next triple parity (see README) */
                newInnerBuffer = SpGistGetBuffer(index,
-                                                                                GBUF_INNER_PARITY(parent->blkno + 1),
+                                                                                GBUF_INNER_PARITY(parent->blkno + 1) |
+                                                                                (isNulls ? GBUF_NULLS : 0),
                                                                                 innerTuple->size + sizeof(ItemIdData),
                                                                                 &xlrec.initInner);
        }
@@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state,
                newInnerBuffer = InvalidBuffer;
        }
 
-       /*----------
+       /*
         * Because a WAL record can't involve more than four buffers, we can
         * only afford to deal with two leaf pages in each picksplit action,
         * ie the current page and at most one other.
@@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state,
         * If we are splitting the root page (turning it from a leaf page into an
         * inner page), then no leaf tuples can go back to the current page; they
         * must all go somewhere else.
-        *----------
         */
-       if (current->blkno != SPGIST_HEAD_BLKNO)
+       if (!SpGistBlockIsRoot(current->blkno))
                currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
        else
                currentFreeSpace = 0;   /* prevent assigning any tuples to current */
@@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state,
                int                     curspace;
                int                     newspace;
 
-               newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
+               newLeafBuffer = SpGistGetBuffer(index,
+                                                                               GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
                                                                                Min(totalLeafSizes,
                                                                                        SPGIST_PAGE_CAPACITY),
                                                                                &xlrec.initDest);
@@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state,
        xlrec.blknoDest = InvalidBlockNumber;
        xlrec.nDelete = 0;
        xlrec.initSrc = isNew;
+       xlrec.storesNulls = isNulls;
 
        leafdata = leafptr = (char *) palloc(totalLeafSizes);
 
@@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state,
         * the root; in that case there's no need because we'll re-init the page
         * below.  We do this first to make room for reinserting new leaf tuples.
         */
-       if (current->blkno != SPGIST_HEAD_BLKNO)
+       if (!SpGistBlockIsRoot(current->blkno))
        {
                /*
                 * Init buffer instead of deleting individual tuples, but only if
@@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state,
                        nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
                        PageGetMaxOffsetNumber(current->page))
                {
-                       SpGistInitBuffer(current->buffer, SPGIST_LEAF);
+                       SpGistInitBuffer(current->buffer,
+                                                        SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
                        xlrec.initSrc = true;
                }
                else if (isNew)
@@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state,
                 * Splitting root page, which was a leaf but now becomes inner page
                 * (and so "current" continues to point at it)
                 */
-               Assert(current->blkno == SPGIST_HEAD_BLKNO);
+               Assert(SpGistBlockIsRoot(current->blkno));
                Assert(redirectTuplePos == InvalidOffsetNumber);
 
-               SpGistInitBuffer(current->buffer, 0);
+               SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
                xlrec.initInner = true;
 
                xlrec.blknoInner = current->blkno;
@@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state,
        XLogRecData rdata[5];
        spgxlogAddNode xlrec;
 
+       /* Should not be applied to nulls */
+       Assert(!SpGistPageStoresNulls(current->page));
+
        /* Construct new inner tuple with additional node */
        newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
 
@@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
                 * allow only one inner tuple on the root page, and spgFormInnerTuple
                 * always checks that inner tuples don't exceed the size of a page.
                 */
-               if (current->blkno == SPGIST_HEAD_BLKNO)
+               if (SpGistBlockIsRoot(current->blkno))
                        elog(ERROR, "cannot enlarge root tuple any more");
                Assert(parent->buffer != InvalidBuffer);
 
@@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state,
        spgxlogSplitTuple xlrec;
        Buffer          newBuffer = InvalidBuffer;
 
+       /* Should not be applied to nulls */
+       Assert(!SpGistPageStoresNulls(current->page));
+
        /*
         * Construct new prefix tuple, containing a single node with the
         * specified label.  (We'll update the node's downlink to point to the
@@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
         * For the space calculation, note that prefixTuple replaces innerTuple
         * but postfixTuple will be a new entry.
         */
-       if (current->blkno == SPGIST_HEAD_BLKNO ||
+       if (SpGistBlockIsRoot(current->blkno) ||
                SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
                prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
        {
@@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
  */
 void
 spgdoinsert(Relation index, SpGistState *state,
-                       ItemPointer heapPtr, Datum datum)
+                       ItemPointer heapPtr, Datum datum, bool isnull)
 {
        int                     level = 0;
        Datum           leafDatum;
@@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state,
         * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
         * that.
         */
-       if (state->attType.attlen == -1)
+       if (!isnull && state->attType.attlen == -1)
                datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
 
        leafDatum = datum;
@@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state,
         * If it isn't gonna fit, and the opclass can't reduce the datum size by
         * suffixing, bail out now rather than getting into an endless loop.
         */
-       leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-               SpGistGetTypeSize(&state->attType, leafDatum);
+       if (!isnull)
+               leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+                       SpGistGetTypeSize(&state->attType, leafDatum);
+       else
+               leafSize = SGDTSIZE + sizeof(ItemIdData);
 
        if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
                ereport(ERROR,
@@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state,
                                   RelationGetRelationName(index)),
                  errhint("Values larger than a buffer page cannot be indexed.")));
 
-       /* Initialize "current" to the root page */
-       current.blkno = SPGIST_HEAD_BLKNO;
+       /* Initialize "current" to the appropriate root page */
+       current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
        current.buffer = InvalidBuffer;
        current.page = NULL;
        current.offnum = FirstOffsetNumber;
@@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state,
                         * for doPickSplit to always have a leaf page at hand; so just
                         * quietly limit our request to a page size.
                         */
-                       current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
-                                                                                        Min(leafSize,
-                                                                                                SPGIST_PAGE_CAPACITY),
-                                                                                        &isNew);
+                       current.buffer =
+                               SpGistGetBuffer(index,
+                                                               GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+                                                               Min(leafSize, SPGIST_PAGE_CAPACITY),
+                                                               &isNew);
                        current.blkno = BufferGetBlockNumber(current.buffer);
                }
                else if (parent.buffer == InvalidBuffer ||
@@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state,
                }
                current.page = BufferGetPage(current.buffer);
 
+               /* should not arrive at a page of the wrong type */
+               if (isnull ? !SpGistPageStoresNulls(current.page) :
+                       SpGistPageStoresNulls(current.page))
+                       elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+                                current.blkno);
+
                if (SpGistPageIsLeaf(current.page))
                {
                        SpGistLeafTuple leafTuple;
                        int                     nToSplit,
                                                sizeToSplit;
 
-                       leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
+                       leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
                        if (leafTuple->size + sizeof(ItemIdData) <=
                                SpGistPageGetFreeSpace(current.page, 1))
                        {
                                /* it fits on page, so insert it and we're done */
                                addLeafTuple(index, state, leafTuple,
-                                                        &current, &parent, isNew);
+                                                        &current, &parent, isnull, isNew);
                                break;
                        }
                        else if ((sizeToSplit =
@@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state,
                                 * chain to another leaf page rather than splitting it.
                                 */
                                Assert(!isNew);
-                               moveLeafs(index, state, &current, &parent, leafTuple);
+                               moveLeafs(index, state, &current, &parent, leafTuple, isnull);
                                break;                  /* we're done */
                        }
                        else
                        {
                                /* picksplit */
                                if (doPickSplit(index, state, &current, &parent,
-                                                               leafTuple, level, isNew))
+                                                               leafTuple, level, isnull, isNew))
                                        break;          /* doPickSplit installed new tuples */
 
                                /* leaf tuple will not be inserted yet */
@@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state,
 
                        memset(&out, 0, sizeof(out));
 
-                       procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
-                       FunctionCall2Coll(procinfo,
-                                                         index->rd_indcollation[0],
-                                                         PointerGetDatum(&in),
-                                                         PointerGetDatum(&out));
+                       if (!isnull)
+                       {
+                               /* use user-defined choose method */
+                               procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
+                               FunctionCall2Coll(procinfo,
+                                                                 index->rd_indcollation[0],
+                                                                 PointerGetDatum(&in),
+                                                                 PointerGetDatum(&out));
+                       }
+                       else
+                       {
+                               /* force "match" action (to insert to random subnode) */
+                               out.resultType = spgMatchNode;
+                       }
 
                        if (innerTuple->allTheSame)
                        {
@@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state,
                                        /* Adjust level as per opclass request */
                                        level += out.result.matchNode.levelAdd;
                                        /* Replace leafDatum and recompute leafSize */
-                                       leafDatum = out.result.matchNode.restDatum;
-                                       leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-                                               SpGistGetTypeSize(&state->attType, leafDatum);
+                                       if (!isnull)
+                                       {
+                                               leafDatum = out.result.matchNode.restDatum;
+                                               leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+                                                       SpGistGetTypeSize(&state->attType, leafDatum);
+                                       }
 
                                        /*
                                         * Loop around and attempt to insert the new leafDatum
index cbcf655674ac5434fab090fcef0c4ca134bbbf94..8ff9245e179ac71063804b79db08cb9ca35a2c9e 100644 (file)
@@ -38,18 +38,15 @@ spgistBuildCallback(Relation index, HeapTuple htup, Datum *values,
                                        bool *isnull, bool tupleIsAlive, void *state)
 {
        SpGistBuildState *buildstate = (SpGistBuildState *) state;
+       MemoryContext oldCtx;
 
-       /* SPGiST doesn't index nulls */
-       if (*isnull == false)
-       {
-               /* Work in temp context, and reset it after each tuple */
-               MemoryContext oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+       /* Work in temp context, and reset it after each tuple */
+       oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
 
-               spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values);
+       spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values, *isnull);
 
-               MemoryContextSwitchTo(oldCtx);
-               MemoryContextReset(buildstate->tmpCtx);
-       }
+       MemoryContextSwitchTo(oldCtx);
+       MemoryContextReset(buildstate->tmpCtx);
 }
 
 /*
@@ -65,20 +62,23 @@ spgbuild(PG_FUNCTION_ARGS)
        double          reltuples;
        SpGistBuildState buildstate;
        Buffer          metabuffer,
-                               rootbuffer;
+                               rootbuffer,
+                               nullbuffer;
 
        if (RelationGetNumberOfBlocks(index) != 0)
                elog(ERROR, "index \"%s\" already contains data",
                         RelationGetRelationName(index));
 
        /*
-        * Initialize the meta page and root page
+        * Initialize the meta page and root pages
         */
        metabuffer = SpGistNewBuffer(index);
        rootbuffer = SpGistNewBuffer(index);
+       nullbuffer = SpGistNewBuffer(index);
 
        Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
-       Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_HEAD_BLKNO);
+       Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
+       Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
 
        START_CRIT_SECTION();
 
@@ -86,6 +86,8 @@ spgbuild(PG_FUNCTION_ARGS)
        MarkBufferDirty(metabuffer);
        SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
        MarkBufferDirty(rootbuffer);
+       SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
+       MarkBufferDirty(nullbuffer);
 
        if (RelationNeedsWAL(index))
        {
@@ -104,12 +106,15 @@ spgbuild(PG_FUNCTION_ARGS)
                PageSetTLI(BufferGetPage(metabuffer), ThisTimeLineID);
                PageSetLSN(BufferGetPage(rootbuffer), recptr);
                PageSetTLI(BufferGetPage(rootbuffer), ThisTimeLineID);
+               PageSetLSN(BufferGetPage(nullbuffer), recptr);
+               PageSetTLI(BufferGetPage(nullbuffer), ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
 
        UnlockReleaseBuffer(metabuffer);
        UnlockReleaseBuffer(rootbuffer);
+       UnlockReleaseBuffer(nullbuffer);
 
        /*
         * Now insert all the heap data into the index
@@ -159,11 +164,20 @@ spgbuildempty(PG_FUNCTION_ARGS)
        /* Likewise for the root page. */
        SpGistInitPage(page, SPGIST_LEAF);
 
-       smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_HEAD_BLKNO,
+       smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
+                         (char *) page, true);
+       if (XLogIsNeeded())
+               log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
+                                       SPGIST_ROOT_BLKNO, page);
+
+       /* Likewise for the null-tuples root page. */
+       SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
+
+       smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
                          (char *) page, true);
        if (XLogIsNeeded())
                log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
-                                       SPGIST_HEAD_BLKNO, page);
+                                       SPGIST_NULL_BLKNO, page);
 
        /*
         * An immediate sync is required even if we xlog'd the pages, because the
@@ -194,10 +208,6 @@ spginsert(PG_FUNCTION_ARGS)
        MemoryContext oldCtx;
        MemoryContext insertCtx;
 
-       /* SPGiST doesn't index nulls */
-       if (*isnull)
-               PG_RETURN_BOOL(false);
-
        insertCtx = AllocSetContextCreate(CurrentMemoryContext,
                                                                          "SP-GiST insert temporary context",
                                                                          ALLOCSET_DEFAULT_MINSIZE,
@@ -207,7 +217,7 @@ spginsert(PG_FUNCTION_ARGS)
 
        initSpGistState(&spgstate, index);
 
-       spgdoinsert(index, &spgstate, ht_ctid, *values);
+       spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull);
 
        SpGistUpdateMetaPage(index);
 
index 99b0852611fbc7454d3e1ef04d7446d89f12b380..7a3a96230d176b6d13e78a771373a55c04802f4f 100644 (file)
@@ -23,6 +23,9 @@
 #include "utils/memutils.h"
 
 
+typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
+                                                          Datum leafValue, bool isnull, bool recheck);
+
 typedef struct ScanStackEntry
 {
        Datum           reconstructedValue;             /* value reconstructed from parent */
@@ -66,14 +69,20 @@ resetSpGistScanOpaque(SpGistScanOpaque so)
 
        freeScanStack(so);
 
-       Assert(!so->searchNulls);       /* XXX fixme */
+       if (so->searchNulls)
+       {
+               /* Stack a work item to scan the null index entries */
+               startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
+               ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
+               so->scanStack = lappend(so->scanStack, startEntry);
+       }
 
        if (so->searchNonNulls)
        {
                /* Stack a work item to scan the non-null index entries */
                startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-               ItemPointerSet(&startEntry->ptr, SPGIST_HEAD_BLKNO, FirstOffsetNumber);
-               so->scanStack = list_make1(startEntry);
+               ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
+               so->scanStack = lappend(so->scanStack, startEntry);
        }
 
        if (so->want_itup)
@@ -243,22 +252,35 @@ spgrestrpos(PG_FUNCTION_ARGS)
 }
 
 /*
- * Test whether a leaf datum satisfies all the scan keys
+ * Test whether a leaf tuple satisfies all the scan keys
  *
  * *leafValue is set to the reconstructed datum, if provided
  * *recheck is set true if any of the operators are lossy
  */
 static bool
-spgLeafTest(Relation index, SpGistScanOpaque so, Datum leafDatum,
+spgLeafTest(Relation index, SpGistScanOpaque so,
+                       SpGistLeafTuple leafTuple, bool isnull,
                        int level, Datum reconstructedValue,
                        Datum *leafValue, bool *recheck)
 {
        bool            result;
+       Datum           leafDatum;
        spgLeafConsistentIn in;
        spgLeafConsistentOut out;
        FmgrInfo   *procinfo;
        MemoryContext oldCtx;
 
+       if (isnull)
+       {
+               /* Should not have arrived on a nulls page unless nulls are wanted */
+               Assert(so->searchNulls);
+               *leafValue = (Datum) 0;
+               *recheck = false;
+               return true;
+       }
+
+       leafDatum = SGLTDATUM(leafTuple, &so->state);
+
        /* use temp context for calling leaf_consistent */
        oldCtx = MemoryContextSwitchTo(so->tempCxt);
 
@@ -295,7 +317,7 @@ spgLeafTest(Relation index, SpGistScanOpaque so, Datum leafDatum,
  */
 static void
 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
-               void (*storeRes) (SpGistScanOpaque, ItemPointer, Datum, bool))
+               storeRes_func storeRes)
 {
        Buffer          buffer = InvalidBuffer;
        bool            reportedSome = false;
@@ -306,6 +328,7 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
                BlockNumber blkno;
                OffsetNumber offset;
                Page            page;
+               bool            isnull;
 
                /* Pull next to-do item from the list */
                if (so->scanStack == NIL)
@@ -336,6 +359,8 @@ redirect:
 
                page = BufferGetPage(buffer);
 
+               isnull = SpGistPageStoresNulls(page) ? true : false;
+
                if (SpGistPageIsLeaf(page))
                {
                        SpGistLeafTuple leafTuple;
@@ -343,7 +368,7 @@ redirect:
                        Datum           leafValue = (Datum) 0;
                        bool            recheck = false;
 
-                       if (blkno == SPGIST_HEAD_BLKNO)
+                       if (SpGistBlockIsRoot(blkno))
                        {
                                /* When root is a leaf, examine all its tuples */
                                for (offset = FirstOffsetNumber; offset <= max; offset++)
@@ -359,13 +384,14 @@ redirect:
 
                                        Assert(ItemPointerIsValid(&leafTuple->heapPtr));
                                        if (spgLeafTest(index, so,
-                                                                       SGLTDATUM(leafTuple, &so->state),
+                                                                       leafTuple, isnull,
                                                                        stackEntry->level,
                                                                        stackEntry->reconstructedValue,
                                                                        &leafValue,
                                                                        &recheck))
                                        {
-                                               storeRes(so, &leafTuple->heapPtr, leafValue, recheck);
+                                               storeRes(so, &leafTuple->heapPtr,
+                                                                leafValue, isnull, recheck);
                                                reportedSome = true;
                                        }
                                }
@@ -404,13 +430,14 @@ redirect:
 
                                        Assert(ItemPointerIsValid(&leafTuple->heapPtr));
                                        if (spgLeafTest(index, so,
-                                                                       SGLTDATUM(leafTuple, &so->state),
+                                                                       leafTuple, isnull,
                                                                        stackEntry->level,
                                                                        stackEntry->reconstructedValue,
                                                                        &leafValue,
                                                                        &recheck))
                                        {
-                                               storeRes(so, &leafTuple->heapPtr, leafValue, recheck);
+                                               storeRes(so, &leafTuple->heapPtr,
+                                                                leafValue, isnull, recheck);
                                                reportedSome = true;
                                        }
 
@@ -468,11 +495,23 @@ redirect:
 
                        memset(&out, 0, sizeof(out));
 
-                       procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
-                       FunctionCall2Coll(procinfo,
-                                                         index->rd_indcollation[0],
-                                                         PointerGetDatum(&in),
-                                                         PointerGetDatum(&out));
+                       if (!isnull)
+                       {
+                               /* use user-defined inner consistent method */
+                               procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
+                               FunctionCall2Coll(procinfo,
+                                                                 index->rd_indcollation[0],
+                                                                 PointerGetDatum(&in),
+                                                                 PointerGetDatum(&out));
+                       }
+                       else
+                       {
+                               /* force all children to be visited */
+                               out.nNodes = in.nNodes;
+                               out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
+                               for (i = 0; i < in.nNodes; i++)
+                                       out.nodeNumbers[i] = i;
+                       }
 
                        MemoryContextSwitchTo(oldCtx);
 
@@ -524,7 +563,7 @@ redirect:
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-                       Datum leafValue, bool recheck)
+                       Datum leafValue, bool isnull, bool recheck)
 {
        tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
        so->ntids++;
@@ -551,7 +590,7 @@ spggetbitmap(PG_FUNCTION_ARGS)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-                         Datum leafValue, bool recheck)
+                         Datum leafValue, bool isnull, bool recheck)
 {
        Assert(so->nPtrs < MaxIndexTuplesPerPage);
        so->heapPtrs[so->nPtrs] = *heapPtr;
@@ -562,8 +601,6 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
                 * Reconstruct desired IndexTuple.  We have to copy the datum out of
                 * the temp context anyway, so we may as well create the tuple here.
                 */
-               bool    isnull = false;
-
                so->indexTups[so->nPtrs] = index_form_tuple(so->indexTupDesc,
                                                                                                        &leafValue,
                                                                                                        &isnull);
index 1f88562be78e7c103f11814efdc4b802e8eb9767..46a10f6a20617e165d32e0a79e46f632e650612f 100644 (file)
@@ -148,10 +148,10 @@ SpGistNewBuffer(Relation index)
                        break;                          /* nothing known to FSM */
 
                /*
-                * The root page shouldn't ever be listed in FSM, but just in case it
-                * is, ignore it.
+                * The fixed pages shouldn't ever be listed in FSM, but just in case
+                * one is, ignore it.
                 */
-               if (blkno == SPGIST_HEAD_BLKNO)
+               if (SpGistBlockIsFixed(blkno))
                        continue;
 
                buffer = ReadBuffer(index, blkno);
@@ -226,9 +226,8 @@ SpGistUpdateMetaPage(Relation index)
 }
 
 /* Macro to select proper element of lastUsedPages cache depending on flags */
-#define GET_LUP(c, f)  (((f) & GBUF_LEAF) ? \
-                                                &(c)->lastUsedPages.leafPage : \
-                                                &(c)->lastUsedPages.innerPage[(f) & GBUF_PARITY_MASK])
+/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
+#define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
 
 /*
  * Allocate and initialize a new buffer of the type and parity specified by
@@ -254,15 +253,21 @@ static Buffer
 allocNewBuffer(Relation index, int flags)
 {
        SpGistCache *cache = spgGetCache(index);
+       uint16          pageflags = 0;
+
+       if (GBUF_REQ_LEAF(flags))
+               pageflags |= SPGIST_LEAF;
+       if (GBUF_REQ_NULLS(flags))
+               pageflags |= SPGIST_NULLS;
 
        for (;;)
        {
                Buffer          buffer;
 
                buffer = SpGistNewBuffer(index);
-               SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0);
+               SpGistInitBuffer(buffer, pageflags);
 
-               if (flags & GBUF_LEAF)
+               if (pageflags & SPGIST_LEAF)
                {
                        /* Leaf pages have no parity concerns, so just use it */
                        return buffer;
@@ -270,9 +275,9 @@ allocNewBuffer(Relation index, int flags)
                else
                {
                        BlockNumber blkno = BufferGetBlockNumber(buffer);
-                       int             blkParity = blkno % 3;
+                       int             blkFlags = GBUF_INNER_PARITY(blkno);
 
-                       if ((flags & GBUF_PARITY_MASK) == blkParity)
+                       if ((flags & GBUF_PARITY_MASK) == blkFlags)
                        {
                                /* Page has right parity, use it */
                                return buffer;
@@ -280,8 +285,10 @@ allocNewBuffer(Relation index, int flags)
                        else
                        {
                                /* Page has wrong parity, record it in cache and try again */
-                               cache->lastUsedPages.innerPage[blkParity].blkno = blkno;
-                               cache->lastUsedPages.innerPage[blkParity].freeSpace =
+                               if (pageflags & SPGIST_NULLS)
+                                       blkFlags |= GBUF_NULLS;
+                               cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
+                               cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
                                        PageGetExactFreeSpace(BufferGetPage(buffer));
                                UnlockReleaseBuffer(buffer);
                        }
@@ -329,8 +336,8 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
                return allocNewBuffer(index, flags);
        }
 
-       /* root page should never be in cache */
-       Assert(lup->blkno != SPGIST_HEAD_BLKNO);
+       /* fixed pages should never be in cache */
+       Assert(!SpGistBlockIsFixed(lup->blkno));
 
        /* If cached freeSpace isn't enough, don't bother looking at the page */
        if (lup->freeSpace >= needSpace)
@@ -355,7 +362,13 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
                if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
                {
                        /* OK to initialize the page */
-                       SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0);
+                       uint16          pageflags = 0;
+
+                       if (GBUF_REQ_LEAF(flags))
+                               pageflags |= SPGIST_LEAF;
+                       if (GBUF_REQ_NULLS(flags))
+                               pageflags |= SPGIST_NULLS;
+                       SpGistInitBuffer(buffer, pageflags);
                        lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
                        *isNew = true;
                        return buffer;
@@ -365,8 +378,8 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
                 * Check that page is of right type and has enough space.  We must
                 * recheck this since our cache isn't necessarily up to date.
                 */
-               if ((flags & GBUF_LEAF) ? SpGistPageIsLeaf(page) :
-                       !SpGistPageIsLeaf(page))
+               if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
+                       (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
                {
                        int                     freeSpace = PageGetExactFreeSpace(page);
 
@@ -407,14 +420,16 @@ SpGistSetLastUsedPage(Relation index, Buffer buffer)
        BlockNumber blkno = BufferGetBlockNumber(buffer);
        int                     flags;
 
-       /* Never enter the root page in cache, though */
-       if (blkno == SPGIST_HEAD_BLKNO)
+       /* Never enter fixed pages (root pages) in cache, though */
+       if (SpGistBlockIsFixed(blkno))
                return;
 
        if (SpGistPageIsLeaf(page))
                flags = GBUF_LEAF;
        else
                flags = GBUF_INNER_PARITY(blkno);
+       if (SpGistPageStoresNulls(page))
+               flags |= GBUF_NULLS;
 
        lup = GET_LUP(cache, flags);
 
@@ -459,6 +474,7 @@ void
 SpGistInitMetapage(Page page)
 {
        SpGistMetaPageData *metadata;
+       int                     i;
 
        SpGistInitPage(page, SPGIST_META);
        metadata = SpGistPageGetMeta(page);
@@ -466,10 +482,8 @@ SpGistInitMetapage(Page page)
        metadata->magicNumber = SPGIST_MAGIC_NUMBER;
 
        /* initialize last-used-page cache to empty */
-       metadata->lastUsedPages.innerPage[0].blkno = InvalidBlockNumber;
-       metadata->lastUsedPages.innerPage[1].blkno = InvalidBlockNumber;
-       metadata->lastUsedPages.innerPage[2].blkno = InvalidBlockNumber;
-       metadata->lastUsedPages.leafPage.blkno = InvalidBlockNumber;
+       for (i = 0; i < SPGIST_CACHED_PAGES; i++)
+               metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
 }
 
 /*
@@ -490,7 +504,7 @@ spgoptions(PG_FUNCTION_ARGS)
 }
 
 /*
- * Get the space needed to store a datum of the indicated type.
+ * Get the space needed to store a non-null datum of the indicated type.
  * Note the result is already rounded up to a MAXALIGN boundary.
  * Also, we follow the SPGiST convention that pass-by-val types are
  * just stored in their Datum representation (compare memcpyDatum).
@@ -511,7 +525,7 @@ SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
 }
 
 /*
- * Copy the given datum to *target
+ * Copy the given non-null datum to *target
  */
 static void
 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
@@ -533,17 +547,20 @@ memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
  * Construct a leaf tuple containing the given heap TID and datum value
  */
 SpGistLeafTuple
-spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum)
+spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
+                                Datum datum, bool isnull)
 {
        SpGistLeafTuple tup;
        unsigned int size;
 
        /* compute space needed (note result is already maxaligned) */
-       size = SGLTHDRSZ + SpGistGetTypeSize(&state->attType, datum);
+       size = SGLTHDRSZ;
+       if (!isnull)
+               size += SpGistGetTypeSize(&state->attType, datum);
 
        /*
         * Ensure that we can replace the tuple with a dead tuple later.  This
-        * test is unnecessary given current tuple layouts, but let's be safe.
+        * test is unnecessary when !isnull, but let's be safe.
         */
        if (size < SGDTSIZE)
                size = SGDTSIZE;
@@ -554,7 +571,8 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum)
        tup->size = size;
        tup->nextOffset = InvalidOffsetNumber;
        tup->heapPtr = *heapPtr;
-       memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
+       if (!isnull)
+               memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
 
        return tup;
 }
index 4598ea8d67fd42854c59a29624683efea91ac451..a09da84a2aac18b9846919f6c81c6ba0d47f3dd2 100644 (file)
@@ -307,7 +307,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer)
 }
 
 /*
- * Vacuum the root page when it is a leaf
+ * Vacuum a root page when it is also a leaf
  *
  * On the root, we just delete any dead leaf tuples; no fancy business
  */
@@ -321,6 +321,7 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
        OffsetNumber i,
                                max = PageGetMaxOffsetNumber(page);
 
+       xlrec.blkno = BufferGetBlockNumber(buffer);
        xlrec.nDelete = 0;
 
        /* Scan page, identify tuples to delete, accumulate stats */
@@ -537,7 +538,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
        }
        else if (SpGistPageIsLeaf(page))
        {
-               if (blkno == SPGIST_HEAD_BLKNO)
+               if (SpGistBlockIsRoot(blkno))
                {
                        vacuumLeafRoot(bds, index, buffer);
                        /* no need for vacuumRedirectAndPlaceholder */
@@ -560,7 +561,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
         * put a new tuple.  Otherwise, check for empty/deletable page, and
         * make sure FSM knows about it.
         */
-       if (blkno != SPGIST_HEAD_BLKNO)
+       if (!SpGistBlockIsRoot(blkno))
        {
                /* If page is now empty, mark it deleted */
                if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
@@ -598,7 +599,7 @@ spgvacuumscan(spgBulkDeleteState *bds)
        /* Finish setting up spgBulkDeleteState */
        initSpGistState(&bds->spgstate, index);
        bds->OldestXmin = GetOldestXmin(true, false);
-       bds->lastFilledBlock = SPGIST_HEAD_BLKNO;
+       bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
 
        /*
         * Reset counts that will be incremented during the scan; needed in case
@@ -619,7 +620,7 @@ spgvacuumscan(spgBulkDeleteState *bds)
         * delete some deletable tuples.  See more extensive comments about
         * this in btvacuumscan().
         */
-       blkno = SPGIST_HEAD_BLKNO;
+       blkno = SPGIST_METAPAGE_BLKNO + 1;
        for (;;)
        {
                /* Get the current relation length */
@@ -648,6 +649,12 @@ spgvacuumscan(spgBulkDeleteState *bds)
         * XXX disabled because it's unsafe due to possible concurrent inserts.
         * We'd have to rescan the pages to make sure they're still empty, and it
         * doesn't seem worth it.  Note that btree doesn't do this either.
+        *
+        * Another reason not to truncate is that it could invalidate the cached
+        * pages-with-freespace pointers in the metapage and other backends'
+        * relation caches, that is leave them pointing to nonexistent pages.
+        * Adding RelationGetNumberOfBlocks calls to protect the places that use
+        * those pointers would be unduly expensive.
         */
 #ifdef NOT_USED
        if (num_pages > bds->lastFilledBlock + 1)
index daa8ae300bae9125c9c4732cdf65185c5779dec2..8e87e2adc9060ffee2a5dcc4b76f2cd2b7726256 100644 (file)
@@ -84,7 +84,7 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
        MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
 
-       buffer = XLogReadBuffer(*node, SPGIST_HEAD_BLKNO, true);
+       buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
        Assert(BufferIsValid(buffer));
        SpGistInitBuffer(buffer, SPGIST_LEAF);
        page = (Page) BufferGetPage(buffer);
@@ -92,6 +92,15 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
        PageSetTLI(page, ThisTimeLineID);
        MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
+
+       buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true);
+       Assert(BufferIsValid(buffer));
+       SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
+       page = (Page) BufferGetPage(buffer);
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+       MarkBufferDirty(buffer);
+       UnlockReleaseBuffer(buffer);
 }
 
 static void
@@ -116,7 +125,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
                        page = BufferGetPage(buffer);
 
                        if (xldata->newPage)
-                               SpGistInitBuffer(buffer, SPGIST_LEAF);
+                               SpGistInitBuffer(buffer,
+                                                                SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
 
                        if (!XLByteLE(lsn, PageGetLSN(page)))
                        {
@@ -218,7 +228,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
                        page = BufferGetPage(buffer);
 
                        if (xldata->newPage)
-                               SpGistInitBuffer(buffer, SPGIST_LEAF);
+                               SpGistInitBuffer(buffer,
+                                                                SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
 
                        if (!XLByteLE(lsn, PageGetLSN(page)))
                        {
@@ -344,6 +355,7 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
                        {
                                page = BufferGetPage(buffer);
 
+                               /* AddNode is not used for nulls pages */
                                if (xldata->newPage)
                                        SpGistInitBuffer(buffer, 0);
 
@@ -464,6 +476,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
                {
                        page = BufferGetPage(buffer);
 
+                       /* SplitTuple is not used for nulls pages */
                        if (xldata->newPage)
                                SpGistInitBuffer(buffer, 0);
 
@@ -545,7 +558,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
         */
        bbi = 0;
 
-       if (xldata->blknoSrc == SPGIST_HEAD_BLKNO)
+       if (SpGistBlockIsRoot(xldata->blknoSrc))
        {
                /* when splitting root, we touch it only in the guise of new inner */
                srcBuffer = InvalidBuffer;
@@ -557,7 +570,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
                Assert(BufferIsValid(srcBuffer));
                page = (Page) BufferGetPage(srcBuffer);
 
-               SpGistInitBuffer(srcBuffer, SPGIST_LEAF);
+               SpGistInitBuffer(srcBuffer,
+                                                SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
                /* don't update LSN etc till we're done with it */
        }
        else
@@ -612,7 +626,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
                Assert(BufferIsValid(destBuffer));
                page = (Page) BufferGetPage(destBuffer);
 
-               SpGistInitBuffer(destBuffer, SPGIST_LEAF);
+               SpGistInitBuffer(destBuffer,
+                                                SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
                /* don't update LSN etc till we're done with it */
        }
        else
@@ -678,7 +693,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
                        page = BufferGetPage(buffer);
 
                        if (xldata->initInner)
-                               SpGistInitBuffer(buffer, 0);
+                               SpGistInitBuffer(buffer,
+                                                                (xldata->storesNulls ? SPGIST_NULLS : 0));
 
                        if (!XLByteLE(lsn, PageGetLSN(page)))
                        {
@@ -709,7 +725,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
        if (xldata->blknoParent == InvalidBlockNumber)
        {
                /* no parent cause we split the root */
-               Assert(xldata->blknoInner == SPGIST_HEAD_BLKNO);
+               Assert(SpGistBlockIsRoot(xldata->blknoInner));
        }
        else if (xldata->blknoInner != xldata->blknoParent)
        {
@@ -842,7 +858,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
 
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-               buffer = XLogReadBuffer(xldata->node, SPGIST_HEAD_BLKNO, false);
+               buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
                if (BufferIsValid(buffer))
                {
                        page = BufferGetPage(buffer);
@@ -1039,7 +1055,8 @@ spg_desc(StringInfo buf, uint8 xl_info, char *rec)
                        break;
                case XLOG_SPGIST_VACUUM_ROOT:
                        out_target(buf, ((spgxlogVacuumRoot *) rec)->node);
-                       appendStringInfo(buf, "vacuum leaf tuples on root page");
+                       appendStringInfo(buf, "vacuum leaf tuples on root page %u",
+                                                        ((spgxlogVacuumRoot *) rec)->blkno);
                        break;
                case XLOG_SPGIST_VACUUM_REDIRECT:
                        out_target(buf, ((spgxlogVacuumRedirect *) rec)->node);
index 76ea5a1578fc8c5e44b8194be8c33f120f365a7d..aa5a6024189413c399d09627e064ec00cf3e811d 100644 (file)
 
 
 /* Page numbers of fixed-location pages */
-#define SPGIST_METAPAGE_BLKNO   (0)
-#define SPGIST_HEAD_BLKNO               (1)
+#define SPGIST_METAPAGE_BLKNO   (0)    /* metapage */
+#define SPGIST_ROOT_BLKNO               (1)    /* root for normal entries */
+#define SPGIST_NULL_BLKNO               (2)    /* root for null-value entries */
+#define SPGIST_LAST_FIXED_BLKNO         SPGIST_NULL_BLKNO
+
+#define SpGistBlockIsRoot(blkno) \
+       ((blkno) == SPGIST_ROOT_BLKNO || (blkno) == SPGIST_NULL_BLKNO)
+#define SpGistBlockIsFixed(blkno) \
+       ((BlockNumber) (blkno) <= (BlockNumber) SPGIST_LAST_FIXED_BLKNO)
 
 /*
  * Contents of page special space on SPGiST index pages
@@ -42,15 +49,14 @@ typedef SpGistPageOpaqueData *SpGistPageOpaque;
 #define SPGIST_META                    (1<<0)
 #define SPGIST_DELETED         (1<<1)
 #define SPGIST_LEAF                    (1<<2)
+#define SPGIST_NULLS           (1<<3)
 
 #define SpGistPageGetOpaque(page) ((SpGistPageOpaque) PageGetSpecialPointer(page))
 #define SpGistPageIsMeta(page) (SpGistPageGetOpaque(page)->flags & SPGIST_META)
 #define SpGistPageIsDeleted(page) (SpGistPageGetOpaque(page)->flags & SPGIST_DELETED)
 #define SpGistPageSetDeleted(page) (SpGistPageGetOpaque(page)->flags |= SPGIST_DELETED)
-#define SpGistPageSetNonDeleted(page) (SpGistPageGetOpaque(page)->flags &= ~SPGIST_DELETED)
 #define SpGistPageIsLeaf(page) (SpGistPageGetOpaque(page)->flags & SPGIST_LEAF)
-#define SpGistPageSetLeaf(page) (SpGistPageGetOpaque(page)->flags |= SPGIST_LEAF)
-#define SpGistPageSetInner(page) (SpGistPageGetOpaque(page)->flags &= ~SPGIST_LEAF)
+#define SpGistPageStoresNulls(page) (SpGistPageGetOpaque(page)->flags & SPGIST_NULLS)
 
 /*
  * The page ID is for the convenience of pg_filedump and similar utilities,
@@ -67,14 +73,16 @@ typedef SpGistPageOpaqueData *SpGistPageOpaque;
  */
 typedef struct SpGistLastUsedPage
 {
-       BlockNumber blkno;                      /* block number of described page */
-       int                     freeSpace;              /* its free space (could be obsolete!) */
+       BlockNumber blkno;                      /* block number, or InvalidBlockNumber */
+       int                     freeSpace;              /* page's free space (could be obsolete!) */
 } SpGistLastUsedPage;
 
+/* Note: indexes in cachedPage[] match flag assignments for SpGistGetBuffer */
+#define SPGIST_CACHED_PAGES 8
+
 typedef struct SpGistLUPCache
 {
-       SpGistLastUsedPage innerPage[3];        /* one per triple-parity group */
-       SpGistLastUsedPage leafPage;
+       SpGistLastUsedPage cachedPage[SPGIST_CACHED_PAGES];
 } SpGistLUPCache;
 
 /*
@@ -86,7 +94,7 @@ typedef struct SpGistMetaPageData
        SpGistLUPCache lastUsedPages;   /* shared storage of last-used info */
 } SpGistMetaPageData;
 
-#define SPGIST_MAGIC_NUMBER (0xBA0BABED)
+#define SPGIST_MAGIC_NUMBER (0xBA0BABEE)
 
 #define SpGistPageGetMeta(p) \
        ((SpGistMetaPageData *) PageGetContents(p))
@@ -266,7 +274,15 @@ typedef SpGistNodeTupleData *SpGistNodeTuple;
  * node (which must be on the same page).  But when the root page is a leaf
  * page, we don't chain its tuples, so nextOffset is always 0 on the root.
  *
- * size must be a multiple of MAXALIGN
+ * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE
+ * so that the tuple can be converted to REDIRECT status later.  (This
+ * restriction only adds bytes for the null-datum case, otherwise alignment
+ * restrictions force it anyway.)
+ *
+ * In a leaf tuple for a NULL indexed value, there's no useful datum value;
+ * however, the SGDTSIZE limit ensures that's there's a Datum word there
+ * anyway, so SGLTDATUM can be applied safely as long as you don't do
+ * anything with the result.
  */
 typedef struct SpGistLeafTupleData
 {
@@ -397,6 +413,7 @@ typedef struct spgxlogAddLeaf
 
        BlockNumber blknoLeaf;          /* destination page for leaf tuple */
        bool            newPage;                /* init dest page? */
+       bool            storesNulls;    /* page is in the nulls tree? */
        OffsetNumber offnumLeaf;        /* offset where leaf tuple gets placed */
        OffsetNumber offnumHeadLeaf; /* offset of head tuple in chain, if any */
 
@@ -419,6 +436,7 @@ typedef struct spgxlogMoveLeafs
        uint16          nMoves;                 /* number of tuples moved from source page */
        bool            newPage;                /* init dest page? */
        bool            replaceDead;    /* are we replacing a DEAD source tuple? */
+       bool            storesNulls;    /* pages are in the nulls tree? */
 
        BlockNumber blknoParent;        /* where the parent downlink is */
        OffsetNumber offnumParent;
@@ -502,6 +520,8 @@ typedef struct spgxlogPickSplit
        OffsetNumber offnumInner;
        bool            initInner;              /* re-init the Inner page? */
 
+       bool            storesNulls;    /* pages are in the nulls tree? */
+
        BlockNumber blknoParent;        /* where the parent downlink is, if any */
        OffsetNumber offnumParent;
        uint16          nodeI;
@@ -553,9 +573,10 @@ typedef struct spgxlogVacuumLeaf
 
 typedef struct spgxlogVacuumRoot
 {
-       /* vacuum root page when it is a leaf */
+       /* vacuum a root page when it is also a leaf */
        RelFileNode node;
 
+       BlockNumber blkno;                      /* block number to clean */
        uint16          nDelete;                /* number of tuples to delete */
 
        spgxlogState stateSrc;
@@ -580,10 +601,18 @@ typedef struct spgxlogVacuumRedirect
  * page in the same triple-parity group as the specified block number.
  * (Typically, this should be GBUF_INNER_PARITY(parentBlockNumber + 1)
  * to follow the rule described in spgist/README.)
+ * In addition, GBUF_NULLS can be OR'd in to get a page for storage of
+ * null-valued tuples.
+ *
+ * Note: these flag values are used as indexes into lastUsedPages.
  */
-#define GBUF_PARITY_MASK               0x03
-#define GBUF_LEAF                              0x04
+#define GBUF_LEAF                              0x03
 #define GBUF_INNER_PARITY(x)   ((x) % 3)
+#define GBUF_NULLS                             0x04
+
+#define GBUF_PARITY_MASK               0x03
+#define GBUF_REQ_LEAF(flags)   (((flags) & GBUF_PARITY_MASK) == GBUF_LEAF)
+#define GBUF_REQ_NULLS(flags)  ((flags) & GBUF_NULLS)
 
 /* spgutils.c */
 extern SpGistCache *spgGetCache(Relation index);
@@ -598,7 +627,8 @@ extern void SpGistInitBuffer(Buffer b, uint16 f);
 extern void SpGistInitMetapage(Page page);
 extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
 extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
-                                                                               ItemPointer heapPtr, Datum datum);
+                                                                               ItemPointer heapPtr,
+                                                                               Datum datum, bool isnull);
 extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state,
                                                                                Datum label, bool isnull);
 extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
@@ -621,6 +651,6 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
                                                int firststate, int reststate,
                                                BlockNumber blkno, OffsetNumber offnum);
 extern void spgdoinsert(Relation index, SpGistState *state,
-                                               ItemPointer heapPtr, Datum datum);
+                                               ItemPointer heapPtr, Datum datum, bool isnull);
 
 #endif   /* SPGIST_PRIVATE_H */
index 993e3872c7b76fd753a848ab805a40074dab5b72..59fd53d2c5fd73a246ac702f58172347a73b4a48 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201203041
+#define CATALOG_VERSION_NO     201203111
 
 #endif
index 9aac9e953b3696c16bb3c0db7879e259b2ef23e0..0d7ed6857e832c0dd4a02bef4a5e2c9dfaa39124 100644 (file)
@@ -129,7 +129,7 @@ DESCR("GiST index access method");
 DATA(insert OID = 2742 (  gin          0 5 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
 DESCR("GIN index access method");
 #define GIN_AM_OID 2742
-DATA(insert OID = 4000 (  spgist       0 5 f f f f f f f f f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
+DATA(insert OID = 4000 (  spgist       0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
 DESCR("SP-GiST index access method");
 #define SPGIST_AM_OID 4000
 
index b1fcada1be4d0618eb73376295c00a6b9488ab80..b7497b047f74be705992c89915127a59f1d00058 100644 (file)
@@ -68,6 +68,7 @@ CREATE TABLE quad_point_tbl AS
     SELECT point(unique1,unique2) AS p FROM tenk1;
 INSERT INTO quad_point_tbl
     SELECT '(333.0,400.0)'::point FROM generate_series(1,1000);
+INSERT INTO quad_point_tbl VALUES (NULL), (NULL), (NULL);
 CREATE INDEX sp_quad_ind ON quad_point_tbl USING spgist (p);
 CREATE TABLE kd_point_tbl AS SELECT * FROM quad_point_tbl;
 CREATE INDEX sp_kd_ind ON kd_point_tbl USING spgist (p kd_point_ops);
@@ -227,6 +228,24 @@ SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0
  (10,10)
 (4 rows)
 
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+ count 
+-------
+     3
+(1 row)
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+ count 
+-------
+ 11000
+(1 row)
+
+SELECT count(*) FROM quad_point_tbl;
+ count 
+-------
+ 11003
+(1 row)
+
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
  count 
 -------
@@ -678,6 +697,50 @@ SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0
  (10,10)
 (4 rows)
 
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Aggregate
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NULL)
+(3 rows)
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+ count 
+-------
+     3
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Aggregate
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NOT NULL)
+(3 rows)
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+ count 
+-------
+ 11000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Aggregate
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+(2 rows)
+
+SELECT count(*) FROM quad_point_tbl;
+ count 
+-------
+ 11003
+(1 row)
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                         QUERY PLAN                         
@@ -1108,6 +1171,55 @@ SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0
  (10,10)
 (4 rows)
 
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+                  QUERY PLAN                  
+----------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_point_tbl
+         Recheck Cond: (p IS NULL)
+         ->  Bitmap Index Scan on sp_quad_ind
+               Index Cond: (p IS NULL)
+(5 rows)
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+ count 
+-------
+     3
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+                  QUERY PLAN                  
+----------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_point_tbl
+         Recheck Cond: (p IS NOT NULL)
+         ->  Bitmap Index Scan on sp_quad_ind
+               Index Cond: (p IS NOT NULL)
+(5 rows)
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+ count 
+-------
+ 11000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl;
+                  QUERY PLAN                  
+----------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_point_tbl
+         ->  Bitmap Index Scan on sp_quad_ind
+(3 rows)
+
+SELECT count(*) FROM quad_point_tbl;
+ count 
+-------
+ 11003
+(1 row)
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                           QUERY PLAN                           
index 5e5fc22579bb0617ded02385b30efb54fefcbc2f..57f52612dfa6d377f633a3de1e2dc2763a6fa280 100644 (file)
@@ -102,6 +102,8 @@ CREATE TABLE quad_point_tbl AS
 INSERT INTO quad_point_tbl
     SELECT '(333.0,400.0)'::point FROM generate_series(1,1000);
 
+INSERT INTO quad_point_tbl VALUES (NULL), (NULL), (NULL);
+
 CREATE INDEX sp_quad_ind ON quad_point_tbl USING spgist (p);
 
 CREATE TABLE kd_point_tbl AS SELECT * FROM quad_point_tbl;
@@ -172,6 +174,12 @@ SELECT * FROM point_tbl WHERE f1 IS NOT NULL ORDER BY f1 <-> '0,1';
 
 SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0,1';
 
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+
+SELECT count(*) FROM quad_point_tbl;
+
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 
 SELECT count(*) FROM quad_point_tbl WHERE box '(200,200,1000,1000)' @> p;
@@ -305,6 +313,18 @@ EXPLAIN (COSTS OFF)
 SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0,1';
 SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0,1';
 
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl;
+SELECT count(*) FROM quad_point_tbl;
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
@@ -422,6 +442,18 @@ EXPLAIN (COSTS OFF)
 SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0,1';
 SELECT * FROM point_tbl WHERE f1 <@ '(-10,-10),(10,10)':: box ORDER BY f1 <-> '0,1';
 
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+SELECT count(*) FROM quad_point_tbl WHERE p IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT count(*) FROM quad_point_tbl WHERE p IS NOT NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_point_tbl;
+SELECT count(*) FROM quad_point_tbl;
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';