#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
#include "utils/timestamp.h"
+#include "utils/tqual.h"
/*
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer);
+ char *page_buffer,
+ Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
}
}
- /* Queue any pending notifies */
+ /* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
ListCell *nextNotify;
* have already committed before we started to LISTEN.
*
* Note that we are not yet listening on anything, so we won't deliver any
- * notification to the frontend.
+ * notification to the frontend. Also, although our transaction might
+ * have executed NOTIFY, those message(s) aren't queued yet so we can't
+ * see them in the queue.
*
* This will also advance the global tail pointer if possible.
*/
volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
+ Snapshot snapshot;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
return;
}
+ /* Get snapshot we'll use to decide which xacts are still in progress */
+ snapshot = RegisterSnapshot(GetLatestSnapshot());
+
/*----------
* Note that we deliver everything that we see in the queue and that
* matches our _current_ listening state.
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf);
+ page_buffer.buf,
+ snapshot);
} while (!reachedStop);
}
PG_CATCH();
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
+
+ /* Done with snapshot */
+ UnregisterSnapshot(snapshot);
}
/*
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer)
+ char *page_buffer,
+ Snapshot snapshot)
{
bool reachedStop = false;
bool reachedEndOfPage;
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
- if (TransactionIdIsInProgress(qe->xid))
+ if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
* The source transaction is still in progress, so we can't
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
- * Note that we must test TransactionIdIsInProgress before we
- * test TransactionIdDidCommit, else we might return a message
- * from a transaction that is not yet visible to snapshots;
- * compare the comments at the head of tqual.c.
+ * Note that we must test XidInMVCCSnapshot before we test
+ * TransactionIdDidCommit, else we might return a message from
+ * a transaction that is not yet visible to snapshots; compare
+ * the comments at the head of tqual.c.
+ *
+ * Also, while our own xact won't be listed in the snapshot,
+ * we need not check for TransactionIdIsCurrentTransactionId
+ * because our transaction cannot (yet) have queued any
+ * messages.
*/
*current = thisentry;
reachedStop = true;
SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
-/* local functions */
-static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
/*
*
* Note: GetSnapshotData never stores either top xid or subxids of our own
* backend into a snapshot, so these xids will not be reported as "running"
- * by this function. This is OK for current uses, because we actually only
- * apply this for known-committed XIDs.
+ * by this function. This is OK for current uses, because we always check
+ * TransactionIdIsCurrentTransactionId first, except when it's known the
+ * XID could not be ours anyway.
*/
-static bool
+bool
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;