xl_heap_header_len xlhdr;
xl_heap_header_len xlhdr_idx;
uint8 info;
+ uint16 prefix_suffix[2];
+ uint16 prefixlen = 0,
+ suffixlen = 0;
XLogRecPtr recptr;
- XLogRecData rdata[7];
+ XLogRecData rdata[9];
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
+ int nr;
+ Buffer newbufref;
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
else
info = XLOG_HEAP_UPDATE;
+ /*
+ * If the old and new tuple are on the same page, we only need to log
+ * the parts of the new tuple that were changed. That saves on the amount
+ * of WAL we need to write. Currently, we just count any unchanged bytes
+ * in the beginning and end of the tuple. That's quick to check, and
+ * perfectly covers the common case that only one field is updated.
+ *
+ * We could do this even if the old and new tuple are on different pages,
+ * but only if we don't make a full-page image of the old page, which is
+ * difficult to know in advance. Also, if the old tuple is corrupt for
+ * some reason, it would allow the corruption to propagate the new page,
+ * so it seems best to avoid. Under the general assumption that most
+ * updates tend to create the new tuple version on the same page, there
+ * isn't much to be gained by doing this across pages anyway.
+ *
+ * Skip this if we're taking a full-page image of the new page, as we don't
+ * include the new tuple in the WAL record in that case. Also disable if
+ * wal_level='logical', as logical decoding needs to be able to read the
+ * new tuple in whole from the WAL record alone.
+ */
+ if (oldbuf == newbuf && !need_tuple_data &&
+ !XLogCheckBufferNeedsBackup(newbuf))
+ {
+ char *oldp = (char *) oldtup->t_data + oldtup->t_data->t_hoff;
+ char *newp = (char *) newtup->t_data + newtup->t_data->t_hoff;
+ int oldlen = oldtup->t_len - oldtup->t_data->t_hoff;
+ int newlen = newtup->t_len - newtup->t_data->t_hoff;
+
+ /* Check for common prefix between old and new tuple */
+ for (prefixlen = 0; prefixlen < Min(oldlen, newlen); prefixlen++)
+ {
+ if (newp[prefixlen] != oldp[prefixlen])
+ break;
+ }
+ /*
+ * Storing the length of the prefix takes 2 bytes, so we need to save
+ * at least 3 bytes or there's no point.
+ */
+ if (prefixlen < 3)
+ prefixlen = 0;
+
+ /* Same for suffix */
+ for (suffixlen = 0; suffixlen < Min(oldlen, newlen) - prefixlen; suffixlen++)
+ {
+ if (newp[newlen - suffixlen - 1] != oldp[oldlen - suffixlen - 1])
+ break;
+ }
+ if (suffixlen < 3)
+ suffixlen = 0;
+ }
+
xlrec.target.node = reln->rd_node;
xlrec.target.tid = oldtup->t_self;
xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
xlrec.newtid = newtup->t_self;
if (new_all_visible_cleared)
xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
+ if (prefixlen > 0)
+ xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
+ if (suffixlen > 0)
+ xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapUpdate;
- rdata[0].buffer = InvalidBuffer;
+ /* If new tuple is the single and first tuple on page... */
+ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+ {
+ info |= XLOG_HEAP_INIT_PAGE;
+ newbufref = InvalidBuffer;
+ }
+ else
+ newbufref = newbuf;
+
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].buffer = oldbuf;
+ rdata[0].buffer_std = true;
rdata[0].next = &(rdata[1]);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = oldbuf;
- rdata[1].buffer_std = true;
+ rdata[1].data = (char *) &xlrec;
+ rdata[1].len = SizeOfHeapUpdate;
+ rdata[1].buffer = InvalidBuffer;
rdata[1].next = &(rdata[2]);
+ /* prefix and/or suffix length fields */
+ if (prefixlen > 0 || suffixlen > 0)
+ {
+ if (prefixlen > 0 && suffixlen > 0)
+ {
+ prefix_suffix[0] = prefixlen;
+ prefix_suffix[1] = suffixlen;
+ rdata[2].data = (char *) &prefix_suffix;
+ rdata[2].len = 2 * sizeof(uint16);
+ }
+ else if (prefixlen > 0)
+ {
+ rdata[2].data = (char *) &prefixlen;
+ rdata[2].len = sizeof(uint16);
+ }
+ else
+ {
+ rdata[2].data = (char *) &suffixlen;
+ rdata[2].len = sizeof(uint16);
+ }
+ rdata[2].buffer = newbufref;
+ rdata[2].buffer_std = true;
+ rdata[2].next = &(rdata[3]);
+ nr = 3;
+ }
+ else
+ nr = 2;
+
xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
xlhdr.header.t_infomask = newtup->t_data->t_infomask;
xlhdr.header.t_hoff = newtup->t_data->t_hoff;
- xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len);
+ xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - prefixlen - suffixlen;
/*
- * As with insert records, we need not store the rdata[2] segment
- * if we decide to store the whole buffer instead unless we're
- * doing logical decoding.
+ * As with insert records, we need not store this rdata segment if we
+ * decide to store the whole buffer instead, unless we're doing logical
+ * decoding.
*/
- rdata[2].data = (char *) &xlhdr;
- rdata[2].len = SizeOfHeapHeaderLen;
- rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
+ rdata[nr].data = (char *) &xlhdr;
+ rdata[nr].len = SizeOfHeapHeaderLen;
+ rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+ rdata[nr].buffer_std = true;
+ rdata[nr].next = &(rdata[nr + 1]);
+ nr++;
- /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[3].data = (char *) newtup->t_data
- + offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
+ /*
+ * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
+ *
+ * The 'data' doesn't include the common prefix or suffix.
+ */
+ if (prefixlen == 0)
+ {
+ rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - suffixlen;
+ rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+ rdata[nr].buffer_std = true;
+ rdata[nr].next = NULL;
+ nr++;
+ }
+ else
+ {
+ /*
+ * Have to write the null bitmap and data after the common prefix as
+ * two separate rdata entries.
+ */
+ /* bitmap [+ padding] [+ oid] */
+ if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) > 0)
+ {
+ rdata[nr - 1].next = &(rdata[nr]);
+ rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+ rdata[nr].buffer_std = true;
+ rdata[nr].next = NULL;
+ nr++;
+ }
+
+ /* data after common prefix */
+ rdata[nr - 1].next = &(rdata[nr]);
+ rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen;
+ rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen;
+ rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
+ rdata[nr].buffer_std = true;
+ rdata[nr].next = NULL;
+ nr++;
+ }
/*
* Separate storage for the FPW buffer reference of the new page in the
*/
if (need_tuple_data)
{
- rdata[3].next = &(rdata[4]);
+ rdata[nr - 1].next = &(rdata[nr]);
+
+ rdata[nr].data = NULL,
+ rdata[nr].len = 0;
+ rdata[nr].buffer = newbufref;
+ rdata[nr].buffer_std = true;
+ rdata[nr].next = NULL;
+ nr++;
- rdata[4].data = NULL,
- rdata[4].len = 0;
- rdata[4].buffer = newbuf;
- rdata[4].buffer_std = true;
- rdata[4].next = NULL;
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
/* We need to log a tuple identity */
xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
xlhdr_idx.t_len = old_key_tuple->t_len;
- rdata[4].next = &(rdata[5]);
- rdata[5].data = (char *) &xlhdr_idx;
- rdata[5].len = SizeOfHeapHeaderLen;
- rdata[5].buffer = InvalidBuffer;
- rdata[5].next = &(rdata[6]);
+ rdata[nr - 1].next = &(rdata[nr]);
+ rdata[nr].data = (char *) &xlhdr_idx;
+ rdata[nr].len = SizeOfHeapHeaderLen;
+ rdata[nr].buffer = InvalidBuffer;
+ rdata[nr].next = &(rdata[nr + 1]);
+ nr++;
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[6].data = (char *) old_key_tuple->t_data
+ rdata[nr].data = (char *) old_key_tuple->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
- rdata[6].len = old_key_tuple->t_len
+ rdata[nr].len = old_key_tuple->t_len
- offsetof(HeapTupleHeaderData, t_bits);
- rdata[6].buffer = InvalidBuffer;
- rdata[6].next = NULL;
+ rdata[nr].buffer = InvalidBuffer;
+ rdata[nr].next = NULL;
+ nr++;
if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
}
}
- /* If new tuple is the single and first tuple on page... */
- if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
- PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
- {
- XLogRecData *rcur = &rdata[2];
- info |= XLOG_HEAP_INIT_PAGE;
- while (rcur != NULL)
- {
- rcur->buffer = InvalidBuffer;
- rcur = rcur->next;
- }
- }
-
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
return recptr;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
+ HeapTupleData oldtup;
HeapTupleHeader htup;
+ char *recdata;
+ uint16 prefixlen = 0,
+ suffixlen = 0;
+ char *newp;
struct
{
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
xl_heap_header_len xlhdr;
- int hsize;
uint32 newlen;
Size freespace;
+ /* initialize to keep the compiler quiet */
+ oldtup.t_data = NULL;
+ oldtup.t_len = 0;
+
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
htup = (HeapTupleHeader) PageGetItem(page, lp);
+ oldtup.t_data = htup;
+ oldtup.t_len = ItemIdGetLength(lp);
+
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
if (hot_update)
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
- hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
+ recdata = (char *) xlrec + SizeOfHeapUpdate;
- memcpy((char *) &xlhdr,
- (char *) xlrec + SizeOfHeapUpdate,
- SizeOfHeapHeaderLen);
- newlen = xlhdr.t_len;
- Assert(newlen <= MaxHeapTupleSize);
+ if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
+ {
+ memcpy(&prefixlen, recdata, sizeof(uint16));
+ recdata += sizeof(uint16);
+ }
+ if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
+ {
+ memcpy(&suffixlen, recdata, sizeof(uint16));
+ recdata += sizeof(uint16);
+ }
+
+ memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
+ recdata += SizeOfHeapHeaderLen;
+
+ Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
- /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
- memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + hsize,
- newlen);
- newlen += offsetof(HeapTupleHeaderData, t_bits);
+
+ /*
+ * Reconstruct the new tuple using the prefix and/or suffix from the old
+ * tuple, and the data stored in the WAL record.
+ */
+ newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits);
+ if (prefixlen > 0)
+ {
+ int len;
+
+ /* copy bitmap [+ padding] [+ oid] from WAL record */
+ len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+ memcpy(newp, recdata, len);
+ recdata += len;
+ newp += len;
+
+ /* copy prefix from old tuple */
+ memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+ newp += prefixlen;
+
+ /* copy new tuple data from WAL record */
+ len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+ memcpy(newp, recdata, len);
+ recdata += len;
+ newp += len;
+ }
+ else
+ {
+ /* copy bitmap [+ padding] [+ oid] + data from record, all in one go */
+ memcpy(newp, recdata, xlhdr.t_len);
+ recdata += xlhdr.t_len;
+ newp += xlhdr.t_len;
+ }
+ /* copy suffix from old tuple */
+ if (suffixlen > 0)
+ memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+ newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen;
htup->t_infomask2 = xlhdr.header.t_infomask2;
htup->t_infomask = xlhdr.header.t_infomask;
htup->t_hoff = xlhdr.header.t_hoff;