]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/gin/ginxlog.c
Rewrite the way GIN posting lists are packed on a page, to reduce WAL volume.
[postgresql] / src / backend / access / gin / ginxlog.c
index c13e01a3c6a55ce63cd9ca7ab999fee6a9355968..02e566cc685396947a9b7357905d2d51655fa0ce 100644 (file)
@@ -78,7 +78,7 @@ static void
 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
-       ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
+       char       *ptr;
        Buffer          buffer;
        Page            page;
 
@@ -89,9 +89,14 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
-       GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
-       memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
-       GinPageGetOpaque(page)->maxoff = data->nitem;
+       GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
+
+       ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
+
+       /* Place page data */
+       memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
+
+       GinDataLeafPageSetPostingListSize(page, data->size);
 
        PageSetLSN(page, lsn);
 
@@ -100,11 +105,11 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
 }
 
 static void
-ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
-                                  void *rdata)
+ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
 {
        Page            page = BufferGetPage(buffer);
        ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
+       OffsetNumber offset = data->offset;
        IndexTuple      itup;
 
        if (rightblkno != InvalidBlockNumber)
@@ -138,30 +143,187 @@ ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
 }
 
 static void
-ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
-                                 void *rdata)
+ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
+{
+       int                     actionno;
+       int                     segno;
+       GinPostingList *oldseg;
+       Pointer         segmentend;
+       char       *walbuf;
+       int                     totalsize;
+
+       /*
+        * If the page is in pre-9.4 format, convert to new format first.
+        */
+       if (!GinPageIsCompressed(page))
+       {
+               ItemPointer uncompressed = (ItemPointer) GinDataPageGetData(page);
+               int                     nuncompressed = GinPageGetOpaque(page)->maxoff;
+               int                     npacked;
+               GinPostingList *plist;
+
+               plist = ginCompressPostingList(uncompressed, nuncompressed,
+                                                                          BLCKSZ, &npacked);
+               Assert(npacked == nuncompressed);
+
+               totalsize = SizeOfGinPostingList(plist);
+
+               memcpy(GinDataLeafPageGetPostingList(page), plist, totalsize);
+               GinDataLeafPageSetPostingListSize(page, totalsize);
+               GinPageSetCompressed(page);
+               GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
+       }
+
+       oldseg = GinDataLeafPageGetPostingList(page);
+       segmentend = (Pointer) oldseg + GinDataLeafPageGetPostingListSize(page);
+       segno = 0;
+
+       walbuf = ((char *) data) + sizeof(ginxlogRecompressDataLeaf);
+       for (actionno = 0; actionno < data->nactions; actionno++)
+       {
+               uint8           a_segno = *((uint8 *) (walbuf++));
+               uint8           a_action = *((uint8 *) (walbuf++));
+               GinPostingList *newseg = NULL;
+               int                     newsegsize = 0;
+               ItemPointerData *items = NULL;
+               uint16          nitems = 0;
+               ItemPointerData *olditems;
+               int                     nolditems;
+               ItemPointerData *newitems;
+               int                     nnewitems;
+               int                     segsize;
+               Pointer         segptr;
+               int                     szleft;
+
+               /* Extract all the information we need from the WAL record */
+               if (a_action == GIN_SEGMENT_INSERT ||
+                       a_action == GIN_SEGMENT_REPLACE)
+               {
+                       newseg = (GinPostingList *) walbuf;
+                       newsegsize = SizeOfGinPostingList(newseg);
+                       walbuf += SHORTALIGN(newsegsize);
+               }
+
+               if (a_action == GIN_SEGMENT_ADDITEMS)
+               {
+                       memcpy(&nitems, walbuf, sizeof(uint16));
+                       walbuf += sizeof(uint16);
+                       items = (ItemPointerData *) walbuf;
+                       walbuf += nitems * sizeof(ItemPointerData);
+               }
+
+               /* Skip to the segment that this action concerns */
+               Assert(segno <= a_segno);
+               while (segno < a_segno)
+               {
+                       oldseg = GinNextPostingListSegment(oldseg);
+                       segno++;
+               }
+
+               /*
+                * ADDITEMS action is handled like REPLACE, but the new segment to
+                * replace the old one is reconstructed using the old segment from
+                * disk and the new items from the WAL record.
+                */
+               if (a_action == GIN_SEGMENT_ADDITEMS)
+               {
+                       int                     npacked;
+
+                       olditems = ginPostingListDecode(oldseg, &nolditems);
+
+                       newitems = ginMergeItemPointers(items, nitems,
+                                                                                       olditems, nolditems,
+                                                                                       &nnewitems);
+                       Assert(nnewitems == nolditems + nitems);
+
+                       newseg = ginCompressPostingList(newitems, nnewitems,
+                                                                                       BLCKSZ, &npacked);
+                       Assert(npacked == nnewitems);
+
+                       newsegsize = SizeOfGinPostingList(newseg);
+                       a_action = GIN_SEGMENT_REPLACE;
+               }
+
+               segptr = (Pointer) oldseg;
+               if (segptr != segmentend)
+                       segsize = SizeOfGinPostingList(oldseg);
+               else
+               {
+                       /*
+                        * Positioned after the last existing segment. Only INSERTs
+                        * expected here.
+                        */
+                       Assert(a_action == GIN_SEGMENT_INSERT);
+                       segsize = 0;
+               }
+               szleft = segmentend - segptr;
+
+               switch (a_action)
+               {
+                       case GIN_SEGMENT_DELETE:
+                               memmove(segptr, segptr + segsize, szleft - segsize);
+                               segmentend -= segsize;
+
+                               segno++;
+                               break;
+
+                       case GIN_SEGMENT_INSERT:
+                               /* make room for the new segment */
+                               memmove(segptr + newsegsize, segptr, szleft);
+                               /* copy the new segment in place */
+                               memcpy(segptr, newseg, newsegsize);
+                               segmentend += newsegsize;
+                               segptr += newsegsize;
+                               break;
+
+                       case GIN_SEGMENT_REPLACE:
+                               /* shift the segments that follow */
+                               memmove(segptr + newsegsize,
+                                               segptr + segsize,
+                                               szleft - segsize);
+                               /* copy the replacement segment in place */
+                               memcpy(segptr, newseg, newsegsize);
+                               segmentend -= segsize;
+                               segmentend += newsegsize;
+                               segptr += newsegsize;
+                               segno++;
+                               break;
+
+                       default:
+                               elog(ERROR, "unexpected GIN leaf action: %u", a_action);
+               }
+               oldseg = (GinPostingList *) segptr;
+       }
+
+       totalsize = segmentend - (Pointer) GinDataLeafPageGetPostingList(page);
+       GinDataLeafPageSetPostingListSize(page, totalsize);
+}
+
+static void
+ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
 {
        Page            page = BufferGetPage(buffer);
 
-       if (GinPageIsLeaf(page))
+       if (isLeaf)
        {
-               ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
-               ItemPointerData *items = data->items;
-               OffsetNumber i;
+               ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
+
+               Assert(GinPageIsLeaf(page));
 
-               for (i = 0; i < data->nitem; i++)
-                       GinDataPageAddItemPointer(page, &items[i], offset + i);
+               ginRedoRecompress(page, data);
        }
        else
        {
-               PostingItem *pitem = (PostingItem *) rdata;
+               ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
                PostingItem *oldpitem;
 
+               Assert(!GinPageIsLeaf(page));
+
                /* update link to right page after split */
-               oldpitem = GinDataPageGetPostingItem(page, offset);
+               oldpitem = GinDataPageGetPostingItem(page, data->offset);
                PostingItemSetBlockNumber(oldpitem, rightblkno);
 
-               GinDataPageAddPostingItem(page, pitem, offset);
+               GinDataPageAddPostingItem(page, &data->newitem, data->offset);
        }
 }
 
@@ -213,12 +375,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                if (data->flags & GIN_INSERT_ISDATA)
                {
                        Assert(GinPageIsData(page));
-                       ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
+                       ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
                }
                else
                {
                        Assert(!GinPageIsData(page));
-                       ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
+                       ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
                }
 
                PageSetLSN(page, lsn);
@@ -253,38 +415,42 @@ ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
 static void
 ginRedoSplitData(Page lpage, Page rpage, void *rdata)
 {
-       ginxlogSplitData *data = (ginxlogSplitData *) rdata;
        bool            isleaf = GinPageIsLeaf(lpage);
-       char       *ptr = (char *) rdata + sizeof(ginxlogSplitData);
-       OffsetNumber i;
-       ItemPointer bound;
 
        if (isleaf)
        {
-               ItemPointer items = (ItemPointer) ptr;
-               for (i = 0; i < data->separator; i++)
-                       GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
-               for (i = data->separator; i < data->nitem; i++)
-                       GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
+               ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata;
+               Pointer         lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf);
+               Pointer         rptr = lptr + data->lsize;
+
+               Assert(data->lsize > 0 && data->lsize <= GinDataLeafMaxContentSize);
+               Assert(data->rsize > 0 && data->rsize <= GinDataLeafMaxContentSize);
+
+               memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize);
+               memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize);
+
+               GinDataLeafPageSetPostingListSize(lpage, data->lsize);
+               GinDataLeafPageSetPostingListSize(rpage, data->rsize);
+               *GinDataPageGetRightBound(lpage) = data->lrightbound;
+               *GinDataPageGetRightBound(rpage) = data->rrightbound;
        }
        else
        {
-               PostingItem *items = (PostingItem *) ptr;
+               ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata;
+               PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal));
+               OffsetNumber i;
+               OffsetNumber maxoff;
+
                for (i = 0; i < data->separator; i++)
                        GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
                for (i = data->separator; i < data->nitem; i++)
                        GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
-       }
-
-       /* set up right key */
-       bound = GinDataPageGetRightBound(lpage);
-       if (isleaf)
-               *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
-       else
-               *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
 
-       bound = GinDataPageGetRightBound(rpage);
-       *bound = data->rightbound;
+               /* set up right key */
+               maxoff = GinPageGetOpaque(lpage)->maxoff;
+               *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key;
+               *GinDataPageGetRightBound(rpage) = data->rightbound;
+       }
 }
 
 static void
@@ -317,9 +483,10 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
        if (isLeaf)
                flags |= GIN_LEAF;
-
        if (isData)
                flags |= GIN_DATA;
+       if (isLeaf && isData)
+               flags |= GIN_COMPRESSED;
 
        lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
        Assert(BufferIsValid(lbuffer));
@@ -352,7 +519,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
                Buffer          rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
                Page            rootPage = BufferGetPage(rootBuf);
 
-               GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
+               GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED);
 
                if (isData)
                {
@@ -383,13 +550,20 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        UnlockReleaseBuffer(lbuffer);
 }
 
+/*
+ * This is functionally the same as heap_xlog_newpage.
+ */
 static void
 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
 {
-       ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
+       ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record);
+       char       *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage);
        Buffer          buffer;
        Page            page;
 
+       Assert(xlrec->hole_offset < BLCKSZ);
+       Assert(xlrec->hole_length < BLCKSZ);
+
        /* If we have a full-page image, restore it and we're done */
        if (record->xl_info & XLR_BKP_BLOCK(0))
        {
@@ -397,41 +571,56 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
                return;
        }
 
-       buffer = XLogReadBuffer(data->node, data->blkno, false);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
        if (!BufferIsValid(buffer))
                return;
        page = (Page) BufferGetPage(buffer);
 
-       if (lsn > PageGetLSN(page))
+       if (xlrec->hole_length == 0)
        {
-               if (GinPageIsData(page))
-               {
-                       memcpy(GinDataPageGetData(page),
-                                  XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-                                  data->nitem * GinSizeOfDataPageItem(page));
-                       GinPageGetOpaque(page)->maxoff = data->nitem;
-               }
-               else
-               {
-                       OffsetNumber i,
-                                          *tod;
-                       IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+               memcpy((char *) page, blk, BLCKSZ);
+       }
+       else
+       {
+               memcpy((char *) page, blk, xlrec->hole_offset);
+               /* must zero-fill the hole */
+               MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length);
+               memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length),
+                          blk + xlrec->hole_offset,
+                          BLCKSZ - (xlrec->hole_offset + xlrec->hole_length));
+       }
 
-                       tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-                       for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-                               tod[i - 1] = i;
+       PageSetLSN(page, lsn);
 
-                       PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+       MarkBufferDirty(buffer);
+       UnlockReleaseBuffer(buffer);
+}
 
-                       for (i = 0; i < data->nitem; i++)
-                       {
-                               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-                                       elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                                data->node.spcNode, data->node.dbNode, data->node.relNode);
-                               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
-                       }
-               }
+static void
+ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
+{
+       ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+
+       /* If we have a full-page image, restore it and we're done */
+       if (record->xl_info & XLR_BKP_BLOCK(0))
+       {
+               (void) RestoreBackupBlock(lsn, record, 0, false, false);
+               return;
+       }
 
+       buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
+       if (!BufferIsValid(buffer))
+               return;
+       page = (Page) BufferGetPage(buffer);
+
+       Assert(GinPageIsLeaf(page));
+       Assert(GinPageIsData(page));
+
+       if (lsn > PageGetLSN(page))
+       {
+               ginRedoRecompress(page, &xlrec->data);
                PageSetLSN(page, lsn);
                MarkBufferDirty(buffer);
        }
@@ -518,17 +707,19 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
        Page            metapage;
        Buffer          buffer;
 
+       /*
+        * Restore the metapage. This is essentially the same as a full-page image,
+        * so restore the metapage unconditionally without looking at the LSN, to
+        * avoid torn page hazards.
+        */
        metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
        if (!BufferIsValid(metabuffer))
                return;                                 /* assume index was deleted, nothing to do */
        metapage = BufferGetPage(metabuffer);
 
-       if (lsn > PageGetLSN(metapage))
-       {
-               memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
-               PageSetLSN(metapage, lsn);
-               MarkBufferDirty(metabuffer);
-       }
+       memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
+       PageSetLSN(metapage, lsn);
+       MarkBufferDirty(metabuffer);
 
        if (data->ntuples > 0)
        {
@@ -678,12 +869,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
                return;                                 /* assume index was deleted, nothing to do */
        metapage = BufferGetPage(metabuffer);
 
-       if (lsn > PageGetLSN(metapage))
-       {
-               memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
-               PageSetLSN(metapage, lsn);
-               MarkBufferDirty(metabuffer);
-       }
+       memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
+       PageSetLSN(metapage, lsn);
+       MarkBufferDirty(metabuffer);
 
        /*
         * In normal operation, shiftList() takes exclusive lock on all the
@@ -747,6 +935,9 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
                case XLOG_GIN_VACUUM_PAGE:
                        ginRedoVacuumPage(lsn, record);
                        break;
+               case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
+                       ginRedoVacuumDataLeafPage(lsn, record);
+                       break;
                case XLOG_GIN_DELETE_PAGE:
                        ginRedoDeletePage(lsn, record);
                        break;