#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
bool synchronous_replication = false; /* Only set in user backends */
char *SyncRepStandbyNames;
-static bool sync_standbys_defined = false; /* Is there at least one name? */
+#define SyncStandbysDefined() \
+ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
+
static bool announce_next_takeover = true;
static void SyncRepQueueInsert(void);
+static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
/*
* Wait for synchronous replication, if requested by user.
+ *
+ * Initially backends start in state SYNC_REP_NOT_WAITING and then
+ * change that state to SYNC_REP_WAITING before adding ourselves
+ * to the wait queue. During SyncRepWakeQueue() a WALSender changes
+ * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
+ * This backend then resets its state to SYNC_REP_NOT_WAITING.
*/
void
SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* there are no sync replication standby names defined.
* Note that those standbys don't need to be connected.
*/
- if (!SyncRepRequested() || !sync_standbys_defined)
+ if (!SyncRepRequested() || !SyncStandbysDefined())
return;
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+ Assert(WalSndCtl != NULL);
+
+ /* Reset the latch before adding ourselves to the queue. */
+ ResetLatch(&MyProc->waitLatch);
+
+ /*
+ * Set our waitLSN so WALSender will know when to wake us, and add
+ * ourselves to the queue.
+ */
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
+ if (!WalSndCtl->sync_standbys_defined)
+ {
+ /*
+ * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is
+ * not set. See SyncRepUpdateSyncStandbysDefined.
+ */
+ LWLockRelease(SyncRepLock);
+ return;
+ }
+ MyProc->waitLSN = XactCommitLSN;
+ MyProc->syncRepState = SYNC_REP_WAITING;
+ SyncRepQueueInsert();
+ Assert(SyncRepQueueIsOrderedByLSN());
+ LWLockRelease(SyncRepLock);
+
+ /* Alter ps display to show waiting for sync rep. */
+ if (update_process_title)
+ {
+ int len;
+
+ old_status = get_ps_display(&len);
+ new_status = (char *) palloc(len + 32 + 1);
+ memcpy(new_status, old_status, len);
+ sprintf(new_status + len, " waiting for %X/%X",
+ XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
+ set_ps_display(new_status, false);
+ new_status[len] = '\0'; /* truncate off " waiting ..." */
+ }
/*
* Wait for specified LSN to be confirmed.
*/
for (;;)
{
+ int syncRepState;
+
+ /*
+ * Wait on latch for up to 60 seconds. This allows us to
+ * check for postmaster death regularly while waiting.
+ * Note that timeout here does not necessarily release from loop.
+ */
+ WaitLatch(&MyProc->waitLatch, 60000000L);
+
+ /* Must reset the latch before testing state. */
ResetLatch(&MyProc->waitLatch);
/*
- * Synchronous Replication state machine within user backend
- *
- * Initially backends start in state SYNC_REP_NOT_WAITING and then
- * change that state to SYNC_REP_WAITING before adding ourselves
- * to the wait queue. During SyncRepWakeQueue() a WALSender changes
- * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
- * This backend then resets its state to SYNC_REP_NOT_WAITING when
- * we exit normally, or SYNC_REP_MUST_DISCONNECT in abnormal cases.
- *
- * We read MyProc->syncRepState without SyncRepLock, which
- * assumes that read access is atomic.
+ * Try checking the state without the lock first. There's no guarantee
+ * that we'll read the most up-to-date value, so if it looks like we're
+ * still waiting, recheck while holding the lock. But if it looks like
+ * we're done, we must really be done, because once walsender changes
+ * the state to SYNC_REP_WAIT_COMPLETE, it will never update it again,
+ * so we can't be seeing a stale value in that case.
*/
- switch (MyProc->syncRepState)
+ syncRepState = MyProc->syncRepState;
+ if (syncRepState == SYNC_REP_WAITING)
{
- case SYNC_REP_NOT_WAITING:
- /*
- * Set our waitLSN so WALSender will know when to wake us.
- * We set this before we add ourselves to queue, so that
- * any proc on the queue can be examined freely without
- * taking a lock on each process in the queue, as long as
- * they hold SyncRepLock.
- */
- MyProc->waitLSN = XactCommitLSN;
- MyProc->syncRepState = SYNC_REP_WAITING;
-
- /*
- * Add to queue while holding lock.
- */
- LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- SyncRepQueueInsert();
- Assert(SyncRepQueueIsOrderedByLSN());
- LWLockRelease(SyncRepLock);
-
- /*
- * Alter ps display to show waiting for sync rep.
- */
- if (update_process_title)
- {
- int len;
-
- old_status = get_ps_display(&len);
- new_status = (char *) palloc(len + 32 + 1);
- memcpy(new_status, old_status, len);
- sprintf(new_status + len, " waiting for %X/%X",
- XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
- set_ps_display(new_status, false);
- new_status[len] = '\0'; /* truncate off " waiting ..." */
- }
-
- break;
-
- case SYNC_REP_WAITING:
- /*
- * Check for conditions that would cause us to leave the
- * wait state before the LSN has been reached.
- */
- if (!PostmasterIsAlive(true))
- {
- LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- SHMQueueDelete(&(MyProc->syncRepLinks));
- LWLockRelease(SyncRepLock);
-
- MyProc->syncRepState = SYNC_REP_MUST_DISCONNECT;
- return;
- }
-
- /*
- * We don't receive SIGHUPs at this point, so resetting
- * synchronous_standby_names has no effect on waiters.
- */
-
- /* Continue waiting */
-
- break;
-
- case SYNC_REP_WAIT_COMPLETE:
- /*
- * WalSender has checked our LSN and has removed us from
- * queue. Cleanup local state and leave.
- */
- Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
-
- MyProc->syncRepState = SYNC_REP_NOT_WAITING;
- MyProc->waitLSN.xlogid = 0;
- MyProc->waitLSN.xrecoff = 0;
-
- if (new_status)
- {
- /* Reset ps display */
- set_ps_display(new_status, false);
- pfree(new_status);
- }
-
- return;
-
- case SYNC_REP_MUST_DISCONNECT:
- return;
-
- default:
- elog(FATAL, "invalid syncRepState");
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ syncRepState = MyProc->syncRepState;
+ LWLockRelease(SyncRepLock);
}
+ if (syncRepState == SYNC_REP_WAIT_COMPLETE)
+ break;
/*
- * Wait on latch for up to 60 seconds. This allows us to
- * check for postmaster death regularly while waiting.
- * Note that timeout here does not necessarily release from loop.
+ * If a wait for synchronous replication is pending, we can neither
+ * acknowledge the commit nor raise ERROR or FATAL. The latter
+ * would lead the client to believe that that the transaction
+ * aborted, which is not true: it's already committed locally.
+ * The former is no good either: the client has requested
+ * synchronous replication, and is entitled to assume that an
+ * acknowledged commit is also replicated, which may not be true.
+ * So in this case we issue a WARNING (which some clients may
+ * be able to interpret) and shut off further output. We do NOT
+ * reset ProcDiePending, so that the process will die after the
+ * commit is cleaned up.
*/
- WaitLatch(&MyProc->waitLatch, 60000000L);
+ if (ProcDiePending)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("canceling the wait for replication and terminating connection due to administrator command"),
+ errdetail("The transaction has already been committed locally but might have not been replicated to the standby.")));
+ whereToSendOutput = DestNone;
+ SyncRepCancelWait();
+ break;
+ }
+
+ /*
+ * It's unclear what to do if a query cancel interrupt arrives. We
+ * can't actually abort at this point, but ignoring the interrupt
+ * altogether is not helpful, so we just terminate the wait with
+ * a suitable warning.
+ */
+ if (QueryCancelPending)
+ {
+ QueryCancelPending = false;
+ ereport(WARNING,
+ (errmsg("canceling wait for synchronous replication due to user request"),
+ errdetail("The transaction has committed locally, but may not have replicated to the standby.")));
+ SyncRepCancelWait();
+ break;
+ }
+
+ /*
+ * If the postmaster dies, we'll probably never get an acknowledgement,
+ * because all the wal sender processes will exit. So just bail out.
+ */
+ if (!PostmasterIsAlive(true))
+ {
+ ProcDiePending = true;
+ whereToSendOutput = DestNone;
+ SyncRepCancelWait();
+ break;
+ }
+ }
+
+ /*
+ * WalSender has checked our LSN and has removed us from queue. Clean up
+ * state and leave. It's OK to reset these shared memory fields without
+ * holding SyncRepLock, because any walsenders will ignore us anyway when
+ * we're not on the queue.
+ */
+ Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+ MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+
+ if (new_status)
+ {
+ /* Reset ps display */
+ set_ps_display(new_status, false);
+ pfree(new_status);
}
}
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
}
+/*
+ * Acquire SyncRepLock and cancel any wait currently in progress.
+ */
+static void
+SyncRepCancelWait(void)
+{
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+ SHMQueueDelete(&(MyProc->syncRepLinks));
+ MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+ LWLockRelease(SyncRepLock);
+}
+
void
SyncRepCleanupAtProcExit(int code, Datum arg)
{
return numprocs;
}
+/*
+ * WAL writer calls this as needed to update the shared sync_standbys_defined
+ * flag, so that backends don't remain permanently wedged if
+ * synchronous_standby_names is unset. It's safe to check the current value
+ * without the lock, because it's only ever updated by one process. But we
+ * must take the lock to change it.
+ */
+void
+SyncRepUpdateSyncStandbysDefined(void)
+{
+ bool sync_standbys_defined = SyncStandbysDefined();
+
+ if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
+ {
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+
+ /*
+ * If synchronous_standby_names has been reset to empty, it's futile
+ * for backends to continue to waiting. Since the user no longer
+ * wants synchronous replication, we'd better wake them up.
+ */
+ if (!sync_standbys_defined)
+ SyncRepWakeQueue(true);
+
+ /*
+ * Only allow people to join the queue when there are synchronous
+ * standbys defined. Without this interlock, there's a race
+ * condition: we might wake up all the current waiters; then, some
+ * backend that hasn't yet reloaded its config might go to sleep on
+ * the queue (and never wake up). This prevents that.
+ */
+ WalSndCtl->sync_standbys_defined = sync_standbys_defined;
+
+ LWLockRelease(SyncRepLock);
+ }
+}
+
#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(void)
return NULL;
}
- /*
- * Is there at least one sync standby? If so cache this knowledge to
- * improve performance of SyncRepWaitForLSN() for all-async configs.
- */
- if (doit && list_length(elemlist) > 0)
- sync_standbys_defined = true;
-
/*
* Any additional validation of standby names should go here.
*