Page page;
Buffer buffer,
newbuf;
- bool need_toast,
- already_marked;
+ bool need_toast;
Size newtupsize,
pagefree;
bool have_tuple_lock = false;
* on the same page as the old, then we need to release the content lock
* (but not the pin!) on the old tuple's buffer while we are off doing
* TOAST and/or table-file-extension work. We must mark the old tuple to
- * show that it's already being updated, else other processes may try to
- * update it themselves.
+ * show that it's locked, else other processes may try to update it
+ * themselves.
*
* We need to invoke the toaster if there are already any out-of-line
* toasted values present, or if the new tuple is over-threshold.
if (need_toast || newtupsize > pagefree)
{
+ /*
+ * To prevent concurrent sessions from updating the tuple, we have to
+ * temporarily mark it locked, while we release the lock.
+ *
+ * To satisfy the rule that any xid potentially appearing in a buffer
+ * written out to disk, we unfortunately have to WAL log this
+ * temporary modification. We can reuse xl_heap_lock for this
+ * purpose. If we crash/error before following through with the
+ * actual update, xmax will be of an aborted transaction, allowing
+ * other sessions to proceed.
+ */
+
+
+ START_CRIT_SECTION();
+
/* Clear obsolete visibility flags ... */
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
/* ... and store info about transaction updating this tuple */
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
- /* temporarily make it look not-updated */
+ /* temporarily make it look not-updated, but locked */
+ oldtup.t_data->t_infomask |= HEAP_XMAX_EXCL_LOCK;
oldtup.t_data->t_ctid = oldtup.t_self;
- already_marked = true;
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(relation))
+ {
+ xl_heap_lock xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.tid = oldtup.t_self;
+ xlrec.locking_xid = xid;
+ xlrec.xid_is_mxact = false;
+ xlrec.shared_lock = false;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfHeapLock;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/*
else
{
/* No TOAST work needed, and it'll fit on same page */
- already_marked = false;
newbuf = buffer;
heaptup = newtup;
}
RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */
- if (!already_marked)
- {
- /* Clear obsolete visibility flags ... */
- oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_XMAX_IS_MULTI |
- HEAP_IS_LOCKED |
- HEAP_MOVED);
- /* ... and store info about transaction updating this tuple */
- HeapTupleHeaderSetXmax(oldtup.t_data, xid);
- HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
- }
+ /* Clear obsolete visibility flags, possibly set by ourselves above... */
+ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+ /* ... and store info about transaction updating this tuple */
+ HeapTupleHeaderSetXmax(oldtup.t_data, xid);
+ HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
/* record address of new tuple in t_ctid of old one */
oldtup.t_data->t_ctid = heaptup->t_self;