--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * barrier.c
+ * Barriers for synchronizing cooperating processes.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * From Wikipedia[1]: "In parallel computing, a barrier is a type of
+ * synchronization method. A barrier for a group of threads or processes in
+ * the source code means any thread/process must stop at this point and cannot
+ * proceed until all other threads/processes reach this barrier."
+ *
+ * This implementation of barriers allows for static sets of participants
+ * known up front, or dynamic sets of participants which processes can join or
+ * leave at any time. In the dynamic case, a phase number can be used to
+ * track progress through a parallel algorithm, and may be necessary to
+ * synchronize with the current phase of a multi-phase algorithm when a new
+ * participant joins. In the static case, the phase number is used
+ * internally, but it isn't strictly necessary for client code to access it
+ * because the phase can only advance when the declared number of participants
+ * reaches the barrier, so client code should be in no doubt about the current
+ * phase of computation at all times.
+ *
+ * Consider a parallel algorithm that involves separate phases of computation
+ * A, B and C where the output of each phase is needed before the next phase
+ * can begin.
+ *
+ * In the case of a static barrier initialized with 4 participants, each
+ * participant works on phase A, then calls BarrierArriveAndWait to wait until
+ * all 4 participants have reached that point. When BarrierArriveAndWait
+ * returns control, each participant can work on B, and so on. Because the
+ * barrier knows how many participants to expect, the phases of computation
+ * don't need labels or numbers, since each process's program counter implies
+ * the current phase. Even if some of the processes are slow to start up and
+ * begin running phase A, the other participants are expecting them and will
+ * patiently wait at the barrier. The code could be written as follows:
+ *
+ * perform_a();
+ * BarrierArriveAndWait(&barrier, ...);
+ * perform_b();
+ * BarrierArriveAndWait(&barrier, ...);
+ * perform_c();
+ * BarrierArriveAndWait(&barrier, ...);
+ *
+ * If the number of participants is not known up front, then a dynamic barrier
+ * is needed and the number should be set to zero at initialization. New
+ * complications arise because the number necessarily changes over time as
+ * participants attach and detach, and therefore phases B, C or even the end
+ * of processing may be reached before any given participant has started
+ * running and attached. Therefore the client code must perform an initial
+ * test of the phase number after attaching, because it needs to find out
+ * which phase of the algorithm has been reached by any participants that are
+ * already attached in order to synchronize with that work. Once the program
+ * counter or some other representation of current progress is synchronized
+ * with the barrier's phase, normal control flow can be used just as in the
+ * static case. Our example could be written using a switch statement with
+ * cases that fall-through, as follows:
+ *
+ * phase = BarrierAttach(&barrier);
+ * switch (phase)
+ * {
+ * case PHASE_A:
+ * perform_a();
+ * BarrierArriveAndWait(&barrier, ...);
+ * case PHASE_B:
+ * perform_b();
+ * BarrierArriveAndWait(&barrier, ...);
+ * case PHASE_C:
+ * perform_c();
+ * BarrierArriveAndWait(&barrier, ...);
+ * }
+ * BarrierDetach(&barrier);
+ *
+ * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic
+ * barriers behave similarly to Java's java.util.concurrent.Phaser.
+ *
+ * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
+ *
+ * IDENTIFICATION
+ * src/backend/storage/ipc/barrier.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "storage/barrier.h"
+
+static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
+
+/*
+ * Initialize this barrier. To use a static party size, provide the number of
+ * participants to wait for at each phase indicating that that number of
+ * backends is implicitly attached. To use a dynamic party size, specify zero
+ * here and then use BarrierAttach() and
+ * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
+ * participants explicitly.
+ */
+void
+BarrierInit(Barrier *barrier, int participants)
+{
+ SpinLockInit(&barrier->mutex);
+ barrier->participants = participants;
+ barrier->arrived = 0;
+ barrier->phase = 0;
+ barrier->elected = 0;
+ barrier->static_party = participants > 0;
+ ConditionVariableInit(&barrier->condition_variable);
+}
+
+/*
+ * Arrive at this barrier, wait for all other attached participants to arrive
+ * too and then return. Increments the current phase. The caller must be
+ * attached.
+ *
+ * While waiting, pg_stat_activity shows a wait_event_class and wait_event
+ * controlled by the wait_event_info passed in, which should be a value from
+ * from one of the WaitEventXXX enums defined in pgstat.h.
+ *
+ * Return true in one arbitrarily chosen participant. Return false in all
+ * others. The return code can be used to elect one participant to execute a
+ * phase of work that must be done serially while other participants wait.
+ */
+bool
+BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
+{
+ bool release = false;
+ bool elected;
+ int start_phase;
+ int next_phase;
+
+ SpinLockAcquire(&barrier->mutex);
+ start_phase = barrier->phase;
+ next_phase = start_phase + 1;
+ ++barrier->arrived;
+ if (barrier->arrived == barrier->participants)
+ {
+ release = true;
+ barrier->arrived = 0;
+ barrier->phase = next_phase;
+ barrier->elected = next_phase;
+ }
+ SpinLockRelease(&barrier->mutex);
+
+ /*
+ * If we were the last expected participant to arrive, we can release our
+ * peers and return true to indicate that this backend has been elected to
+ * perform any serial work.
+ */
+ if (release)
+ {
+ ConditionVariableBroadcast(&barrier->condition_variable);
+
+ return true;
+ }
+
+ /*
+ * Otherwise we have to wait for the last participant to arrive and
+ * advance the phase.
+ */
+ elected = false;
+ ConditionVariablePrepareToSleep(&barrier->condition_variable);
+ for (;;)
+ {
+ /*
+ * We know that phase must either be start_phase, indicating that we
+ * need to keep waiting, or next_phase, indicating that the last
+ * participant that we were waiting for has either arrived or detached
+ * so that the next phase has begun. The phase cannot advance any
+ * further than that without this backend's participation, because
+ * this backend is attached.
+ */
+ SpinLockAcquire(&barrier->mutex);
+ Assert(barrier->phase == start_phase || barrier->phase == next_phase);
+ release = barrier->phase == next_phase;
+ if (release && barrier->elected != next_phase)
+ {
+ /*
+ * Usually the backend that arrives last and releases the other
+ * backends is elected to return true (see above), so that it can
+ * begin processing serial work while it has a CPU timeslice.
+ * However, if the barrier advanced because someone detached, then
+ * one of the backends that is awoken will need to be elected.
+ */
+ barrier->elected = barrier->phase;
+ elected = true;
+ }
+ SpinLockRelease(&barrier->mutex);
+ if (release)
+ break;
+ ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
+ }
+ ConditionVariableCancelSleep();
+
+ return elected;
+}
+
+/*
+ * Arrive at this barrier, but detach rather than waiting. Returns true if
+ * the caller was the last to detach.
+ */
+bool
+BarrierArriveAndDetach(Barrier *barrier)
+{
+ return BarrierDetachImpl(barrier, true);
+}
+
+/*
+ * Attach to a barrier. All waiting participants will now wait for this
+ * participant to call BarrierArriveAndWait(), BarrierDetach() or
+ * BarrierArriveAndDetach(). Return the current phase.
+ */
+int
+BarrierAttach(Barrier *barrier)
+{
+ int phase;
+
+ Assert(!barrier->static_party);
+
+ SpinLockAcquire(&barrier->mutex);
+ ++barrier->participants;
+ phase = barrier->phase;
+ SpinLockRelease(&barrier->mutex);
+
+ return phase;
+}
+
+/*
+ * Detach from a barrier. This may release other waiters from BarrierWait and
+ * advance the phase if they were only waiting for this backend. Return true
+ * if this participant was the last to detach.
+ */
+bool
+BarrierDetach(Barrier *barrier)
+{
+ return BarrierDetachImpl(barrier, false);
+}
+
+/*
+ * Return the current phase of a barrier. The caller must be attached.
+ */
+int
+BarrierPhase(Barrier *barrier)
+{
+ /*
+ * It is OK to read barrier->phase without locking, because it can't
+ * change without us (we are attached to it), and we executed a memory
+ * barrier when we either attached or participated in changing it last
+ * time.
+ */
+ return barrier->phase;
+}
+
+/*
+ * Return an instantaneous snapshot of the number of participants currently
+ * attached to this barrier. For debugging purposes only.
+ */
+int
+BarrierParticipants(Barrier *barrier)
+{
+ int participants;
+
+ SpinLockAcquire(&barrier->mutex);
+ participants = barrier->participants;
+ SpinLockRelease(&barrier->mutex);
+
+ return participants;
+}
+
+/*
+ * Detach from a barrier. If 'arrive' is true then also increment the phase
+ * if there are no other participants. If there are other participants
+ * waiting, then the phase will be advanced and they'll be released if they
+ * were only waiting for the caller. Return true if this participant was the
+ * last to detach.
+ */
+static inline bool
+BarrierDetachImpl(Barrier *barrier, bool arrive)
+{
+ bool release;
+ bool last;
+
+ Assert(!barrier->static_party);
+
+ SpinLockAcquire(&barrier->mutex);
+ Assert(barrier->participants > 0);
+ --barrier->participants;
+
+ /*
+ * If any other participants are waiting and we were the last participant
+ * waited for, release them. If no other participants are waiting, but
+ * this is a BarrierArriveAndDetach() call, then advance the phase too.
+ */
+ if ((arrive || barrier->participants > 0) &&
+ barrier->arrived == barrier->participants)
+ {
+ release = true;
+ barrier->arrived = 0;
+ ++barrier->phase;
+ }
+ else
+ release = false;
+
+ last = barrier->participants == 0;
+ SpinLockRelease(&barrier->mutex);
+
+ if (release)
+ ConditionVariableBroadcast(&barrier->condition_variable);
+
+ return last;
+}