]> granicus.if.org Git - postgresql/commitdiff
Only WAL-log the modified portion in an UPDATE, if possible.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 12 Mar 2014 20:46:04 +0000 (22:46 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 12 Mar 2014 21:28:36 +0000 (23:28 +0200)
When a row is updated, and the new tuple version is put on the same page as
the old one, only WAL-log the part of the new tuple that's not identical to
the old. This saves significantly on the amount of WAL that needs to be
written, in the common case that most fields are not modified.

Amit Kapila, with a lot of back and forth with me, Robert Haas, and others.

src/backend/access/heap/heapam.c
src/backend/access/transam/xlog.c
src/include/access/heapam_xlog.h
src/include/access/xlog.h

index 71ec74015cdc60cf6a21040c9142cdbac4c214da..e2337acc2abc789fe696ab0bcde6fc77397208bb 100644 (file)
@@ -6605,10 +6605,15 @@ log_heap_update(Relation reln, Buffer oldbuf,
        xl_heap_header_len xlhdr;
        xl_heap_header_len xlhdr_idx;
        uint8           info;
+       uint16          prefix_suffix[2];
+       uint16          prefixlen = 0,
+                               suffixlen = 0;
        XLogRecPtr      recptr;
-       XLogRecData rdata[7];
+       XLogRecData rdata[9];
        Page            page = BufferGetPage(newbuf);
        bool            need_tuple_data = RelationIsLogicallyLogged(reln);
+       int                     nr;
+       Buffer          newbufref;
 
        /* Caller should not call me on a non-WAL-logged relation */
        Assert(RelationNeedsWAL(reln));
@@ -6618,6 +6623,57 @@ log_heap_update(Relation reln, Buffer oldbuf,
        else
                info = XLOG_HEAP_UPDATE;
 
+       /*
+        * If the old and new tuple are on the same page, we only need to log
+        * the parts of the new tuple that were changed.  That saves on the amount
+        * of WAL we need to write.  Currently, we just count any unchanged bytes
+        * in the beginning and end of the tuple.  That's quick to check, and
+        * perfectly covers the common case that only one field is updated.
+        *
+        * We could do this even if the old and new tuple are on different pages,
+        * but only if we don't make a full-page image of the old page, which is
+        * difficult to know in advance.  Also, if the old tuple is corrupt for
+        * some reason, it would allow the corruption to propagate the new page,
+        * so it seems best to avoid.  Under the general assumption that most
+        * updates tend to create the new tuple version on the same page, there
+        * isn't much to be gained by doing this across pages anyway.
+        *
+        * Skip this if we're taking a full-page image of the new page, as we don't
+        * include the new tuple in the WAL record in that case.  Also disable if
+        * wal_level='logical', as logical decoding needs to be able to read the
+        * new tuple in whole from the WAL record alone.
+        */
+       if (oldbuf == newbuf && !need_tuple_data &&
+               !XLogCheckBufferNeedsBackup(newbuf))
+       {
+               char       *oldp = (char *) oldtup->t_data + oldtup->t_data->t_hoff;
+               char       *newp = (char *) newtup->t_data + newtup->t_data->t_hoff;
+               int                     oldlen = oldtup->t_len - oldtup->t_data->t_hoff;
+               int                     newlen = newtup->t_len - newtup->t_data->t_hoff;
+
+               /* Check for common prefix between old and new tuple */
+               for (prefixlen = 0; prefixlen < Min(oldlen, newlen); prefixlen++)
+               {
+                       if (newp[prefixlen] != oldp[prefixlen])
+                               break;
+               }
+               /*
+                * Storing the length of the prefix takes 2 bytes, so we need to save
+                * at least 3 bytes or there's no point.
+                */
+               if (prefixlen < 3)
+                       prefixlen = 0;
+
+               /* Same for suffix */
+               for (suffixlen = 0; suffixlen < Min(oldlen, newlen) - prefixlen; suffixlen++)
+               {
+                       if (newp[newlen - suffixlen - 1] != oldp[oldlen - suffixlen - 1])
+                               break;
+               }
+               if (suffixlen < 3)
+                       suffixlen = 0;
+       }
+
        xlrec.target.node = reln->rd_node;
        xlrec.target.tid = oldtup->t_self;
        xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
@@ -6630,41 +6686,119 @@ log_heap_update(Relation reln, Buffer oldbuf,
        xlrec.newtid = newtup->t_self;
        if (new_all_visible_cleared)
                xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
+       if (prefixlen > 0)
+               xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
+       if (suffixlen > 0)
+               xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
 
-       rdata[0].data = (char *) &xlrec;
-       rdata[0].len = SizeOfHeapUpdate;
-       rdata[0].buffer = InvalidBuffer;
+       /* If new tuple is the single and first tuple on page... */
+       if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
+               PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+       {
+               info |= XLOG_HEAP_INIT_PAGE;
+               newbufref = InvalidBuffer;
+       }
+       else
+               newbufref = newbuf;
+
+       rdata[0].data = NULL;
+       rdata[0].len = 0;
+       rdata[0].buffer = oldbuf;
+       rdata[0].buffer_std = true;
        rdata[0].next = &(rdata[1]);
 
-       rdata[1].data = NULL;
-       rdata[1].len = 0;
-       rdata[1].buffer = oldbuf;
-       rdata[1].buffer_std = true;
+       rdata[1].data = (char *) &xlrec;
+       rdata[1].len = SizeOfHeapUpdate;
+       rdata[1].buffer = InvalidBuffer;
        rdata[1].next = &(rdata[2]);
 
+       /* prefix and/or suffix length fields */
+       if (prefixlen > 0 || suffixlen > 0)
+       {
+               if (prefixlen > 0 && suffixlen > 0)
+               {
+                       prefix_suffix[0] = prefixlen;
+                       prefix_suffix[1] = suffixlen;
+                       rdata[2].data = (char *) &prefix_suffix;
+                       rdata[2].len = 2 * sizeof(uint16);
+               }
+               else if (prefixlen > 0)
+               {
+                       rdata[2].data = (char *) &prefixlen;
+                       rdata[2].len = sizeof(uint16);
+               }
+               else
+               {
+                       rdata[2].data = (char *) &suffixlen;
+                       rdata[2].len = sizeof(uint16);
+               }
+               rdata[2].buffer = newbufref;
+               rdata[2].buffer_std = true;
+               rdata[2].next = &(rdata[3]);
+               nr = 3;
+       }
+       else
+               nr = 2;
+
        xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
        xlhdr.header.t_infomask = newtup->t_data->t_infomask;
        xlhdr.header.t_hoff = newtup->t_data->t_hoff;
-       xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+       Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len);
+       xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - prefixlen - suffixlen;
 
        /*
-        * As with insert records, we need not store the rdata[2] segment
-        * if we decide to store the whole buffer instead unless we're
-        * doing logical decoding.
+        * As with insert records, we need not store this rdata segment if we
+        * decide to store the whole buffer instead, unless we're doing logical
+        * decoding.
         */
-       rdata[2].data = (char *) &xlhdr;
-       rdata[2].len = SizeOfHeapHeaderLen;
-       rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
-       rdata[2].buffer_std = true;
-       rdata[2].next = &(rdata[3]);
+       rdata[nr].data = (char *) &xlhdr;
+       rdata[nr].len = SizeOfHeapHeaderLen;
+       rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+       rdata[nr].buffer_std = true;
+       rdata[nr].next = &(rdata[nr + 1]);
+       nr++;
 
-       /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-       rdata[3].data = (char *) newtup->t_data
-               + offsetof(HeapTupleHeaderData, t_bits);
-       rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-       rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
-       rdata[3].buffer_std = true;
-       rdata[3].next = NULL;
+       /*
+        * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
+        *
+        * The 'data' doesn't include the common prefix or suffix.
+        */
+       if (prefixlen == 0)
+       {
+               rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+               rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - suffixlen;
+               rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+               rdata[nr].buffer_std = true;
+               rdata[nr].next = NULL;
+               nr++;
+       }
+       else
+       {
+               /*
+                * Have to write the null bitmap and data after the common prefix as
+                * two separate rdata entries.
+                */
+               /* bitmap [+ padding] [+ oid] */
+               if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) > 0)
+               {
+                       rdata[nr - 1].next = &(rdata[nr]);
+                       rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+                       rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+                       rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+                       rdata[nr].buffer_std = true;
+                       rdata[nr].next = NULL;
+                       nr++;
+               }
+
+               /* data after common prefix */
+               rdata[nr - 1].next = &(rdata[nr]);
+               rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen;
+               rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen;
+               rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+               rdata[nr].buffer_std = true;
+               rdata[nr].next = NULL;
+               nr++;
+       }
 
        /*
         * Separate storage for the FPW buffer reference of the new page in the
@@ -6672,13 +6806,15 @@ log_heap_update(Relation reln, Buffer oldbuf,
        */
        if (need_tuple_data)
        {
-               rdata[3].next = &(rdata[4]);
+               rdata[nr - 1].next = &(rdata[nr]);
+
+               rdata[nr].data = NULL,
+               rdata[nr].len = 0;
+               rdata[nr].buffer = newbufref;
+               rdata[nr].buffer_std = true;
+               rdata[nr].next = NULL;
+               nr++;
 
-               rdata[4].data = NULL,
-               rdata[4].len = 0;
-               rdata[4].buffer = newbuf;
-               rdata[4].buffer_std = true;
-               rdata[4].next = NULL;
                xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
 
                /* We need to log a tuple identity */
@@ -6690,19 +6826,21 @@ log_heap_update(Relation reln, Buffer oldbuf,
                        xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
                        xlhdr_idx.t_len = old_key_tuple->t_len;
 
-                       rdata[4].next = &(rdata[5]);
-                       rdata[5].data = (char *) &xlhdr_idx;
-                       rdata[5].len = SizeOfHeapHeaderLen;
-                       rdata[5].buffer = InvalidBuffer;
-                       rdata[5].next = &(rdata[6]);
+                       rdata[nr - 1].next = &(rdata[nr]);
+                       rdata[nr].data = (char *) &xlhdr_idx;
+                       rdata[nr].len = SizeOfHeapHeaderLen;
+                       rdata[nr].buffer = InvalidBuffer;
+                       rdata[nr].next = &(rdata[nr + 1]);
+                       nr++;
 
                        /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-                       rdata[6].data = (char *) old_key_tuple->t_data
+                       rdata[nr].data = (char *) old_key_tuple->t_data
                                + offsetof(HeapTupleHeaderData, t_bits);
-                       rdata[6].len = old_key_tuple->t_len
+                       rdata[nr].len = old_key_tuple->t_len
                                - offsetof(HeapTupleHeaderData, t_bits);
-                       rdata[6].buffer = InvalidBuffer;
-                       rdata[6].next = NULL;
+                       rdata[nr].buffer = InvalidBuffer;
+                       rdata[nr].next = NULL;
+                       nr++;
 
                        if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
                                xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
@@ -6711,19 +6849,6 @@ log_heap_update(Relation reln, Buffer oldbuf,
                }
        }
 
-       /* If new tuple is the single and first tuple on page... */
-       if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
-               PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
-       {
-               XLogRecData *rcur = &rdata[2];
-               info |= XLOG_HEAP_INIT_PAGE;
-               while (rcur != NULL)
-               {
-                       rcur->buffer = InvalidBuffer;
-                       rcur = rcur->next;
-               }
-       }
-
        recptr = XLogInsert(RM_HEAP_ID, info, rdata);
 
        return recptr;
@@ -7750,17 +7875,25 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
        Page            page;
        OffsetNumber offnum;
        ItemId          lp = NULL;
+       HeapTupleData oldtup;
        HeapTupleHeader htup;
+       char       *recdata;
+       uint16          prefixlen = 0,
+                               suffixlen = 0;
+       char       *newp;
        struct
        {
                HeapTupleHeaderData hdr;
                char            data[MaxHeapTupleSize];
        }                       tbuf;
        xl_heap_header_len xlhdr;
-       int                     hsize;
        uint32          newlen;
        Size            freespace;
 
+       /* initialize to keep the compiler quiet */
+       oldtup.t_data = NULL;
+       oldtup.t_len = 0;
+
        /*
         * The visibility map may need to be fixed even if the heap page is
         * already up-to-date.
@@ -7827,6 +7960,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 
        htup = (HeapTupleHeader) PageGetItem(page, lp);
 
+       oldtup.t_data = htup;
+       oldtup.t_len = ItemIdGetLength(lp);
+
        htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
        htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
        if (hot_update)
@@ -7925,20 +8061,63 @@ newsame:;
        if (PageGetMaxOffsetNumber(page) + 1 < offnum)
                elog(PANIC, "heap_update_redo: invalid max offset number");
 
-       hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
+       recdata = (char *) xlrec + SizeOfHeapUpdate;
 
-       memcpy((char *) &xlhdr,
-                  (char *) xlrec + SizeOfHeapUpdate,
-                  SizeOfHeapHeaderLen);
-       newlen = xlhdr.t_len;
-       Assert(newlen <= MaxHeapTupleSize);
+       if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
+       {
+               memcpy(&prefixlen, recdata, sizeof(uint16));
+               recdata += sizeof(uint16);
+       }
+       if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
+       {
+               memcpy(&suffixlen, recdata, sizeof(uint16));
+               recdata += sizeof(uint16);
+       }
+
+       memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
+       recdata += SizeOfHeapHeaderLen;
+
+       Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
        htup = &tbuf.hdr;
        MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
-       /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-       memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
-                  (char *) xlrec + hsize,
-                  newlen);
-       newlen += offsetof(HeapTupleHeaderData, t_bits);
+
+       /*
+        * Reconstruct the new tuple using the prefix and/or suffix from the old
+        * tuple, and the data stored in the WAL record.
+        */
+       newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits);
+       if (prefixlen > 0)
+       {
+               int                     len;
+
+               /* copy bitmap [+ padding] [+ oid] from WAL record */
+               len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+               memcpy(newp, recdata, len);
+               recdata += len;
+               newp += len;
+
+               /* copy prefix from old tuple */
+               memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+               newp += prefixlen;
+
+               /* copy new tuple data from WAL record */
+               len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+               memcpy(newp, recdata, len);
+               recdata += len;
+               newp += len;
+       }
+       else
+       {
+               /* copy bitmap [+ padding] [+ oid] + data from record, all in one go */
+               memcpy(newp, recdata, xlhdr.t_len);
+               recdata += xlhdr.t_len;
+               newp += xlhdr.t_len;
+       }
+       /* copy suffix from old tuple */
+       if (suffixlen > 0)
+               memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+       newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen;
        htup->t_infomask2 = xlhdr.header.t_infomask2;
        htup->t_infomask = xlhdr.header.t_infomask;
        htup->t_hoff = xlhdr.header.t_hoff;
index cdbe305f952f1b77d663d379c06bf5d26b200edd..141edf43278baef8ae257c9238db54bdbe8a5311 100644 (file)
@@ -2335,6 +2335,29 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
        return result;
 }
 
+/*
+ * Determine whether the buffer referenced has to be backed up.
+ *
+ * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
+ * could change later, so the result should be used for optimization purposes
+ * only.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+       bool            doPageWrites;
+       Page            page;
+
+       page = BufferGetPage(buffer);
+
+       doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
+
+       if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+               return true;                    /* buffer requires backup */
+
+       return false;                           /* buffer does not need to be backed up */
+}
+
 /*
  * Determine whether the buffer referenced by an XLogRecData item has to
  * be backed up, and if so fill a BkpBlock struct for it.  In any case
index 194635952cb763491fefaf83ee8093311bf72522..d6bc8f7f24f339846f8363d64c7617d84f6b3aa6 100644 (file)
@@ -67,6 +67,8 @@
 #define XLOG_HEAP_CONTAINS_OLD_TUPLE           (1<<2)
 #define XLOG_HEAP_CONTAINS_OLD_KEY                     (1<<3)
 #define XLOG_HEAP_CONTAINS_NEW_TUPLE           (1<<4)
+#define XLOG_HEAP_PREFIX_FROM_OLD                      (1<<5)
+#define XLOG_HEAP_SUFFIX_FROM_OLD                      (1<<6)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLOG_HEAP_CONTAINS_OLD                                                 \
@@ -179,7 +181,22 @@ typedef struct xl_heap_update
        ItemPointerData newtid;         /* new inserted tuple id */
        uint8           old_infobits_set;       /* infomask bits to set on old tuple */
        uint8           flags;
-       /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
+
+       /*
+        * If XLOG_HEAP_PREFIX_FROM_OLD or XLOG_HEAP_SUFFIX_FROM_OLD flags are
+        * set, the prefix and/or suffix come next, as one or two uint16s.
+        *
+        * After that, xl_heap_header_len and new tuple data follow.  The new
+        * tuple data and length don't include the prefix and suffix, which are
+        * copied from the old tuple on replay.  The new tuple data is omitted if
+        * a full-page image of the page was taken (unless the
+        * XLOG_HEAP_CONTAINS_NEW_TUPLE flag is set, in which case it's included
+        * anyway).
+        *
+        * If XLOG_HEAP_CONTAINS_OLD_TUPLE or XLOG_HEAP_CONTAINS_OLD_KEY flags are
+        * set, another xl_heap_header_len struct and tuple data for the old tuple
+        * follows.
+        */
 } xl_heap_update;
 
 #define SizeOfHeapUpdate       (offsetof(xl_heap_update, flags) + sizeof(uint8))
index a238292b76ead2c4dc368ac8b7309b764a6a8f59..35092284664184ed47fe10c0800c22cfe0c375a6 100644 (file)
@@ -279,6 +279,7 @@ typedef struct CheckpointStatsData
 extern CheckpointStatsData CheckpointStats;
 
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
+extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);