From: Andres Freund Date: Thu, 30 Nov 2017 01:07:16 +0000 (-0800) Subject: Add a barrier primitive for synchronizing backends. X-Git-Tag: REL_11_BETA1~1138 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=1145acc70debacc34de01fac238defde543f4ed4;p=postgresql Add a barrier primitive for synchronizing backends. Provide support for dynamic or static parties of processes to wait for all processes to reach point in the code before continuing. This is similar to the mechanism of the same name in POSIX threads and MPI, though has explicit phasing and dynamic party support like the Java core library's Phaser. This will be used by an upcoming patch adding support for parallel hash joins. Author: Thomas Munro Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm=2_y7oi01OjA_wLvYcWMc9_d=LaoxrY3eiROCZkB_qakA@mail.gmail.com --- diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index 8a55392ade..9dbdc26c9b 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ +OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \ sinvaladt.o standby.o diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c new file mode 100644 index 0000000000..7dde932738 --- /dev/null +++ b/src/backend/storage/ipc/barrier.c @@ -0,0 +1,311 @@ +/*------------------------------------------------------------------------- + * + * barrier.c + * Barriers for synchronizing cooperating processes. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * From Wikipedia[1]: "In parallel computing, a barrier is a type of + * synchronization method. A barrier for a group of threads or processes in + * the source code means any thread/process must stop at this point and cannot + * proceed until all other threads/processes reach this barrier." + * + * This implementation of barriers allows for static sets of participants + * known up front, or dynamic sets of participants which processes can join or + * leave at any time. In the dynamic case, a phase number can be used to + * track progress through a parallel algorithm, and may be necessary to + * synchronize with the current phase of a multi-phase algorithm when a new + * participant joins. In the static case, the phase number is used + * internally, but it isn't strictly necessary for client code to access it + * because the phase can only advance when the declared number of participants + * reaches the barrier, so client code should be in no doubt about the current + * phase of computation at all times. + * + * Consider a parallel algorithm that involves separate phases of computation + * A, B and C where the output of each phase is needed before the next phase + * can begin. + * + * In the case of a static barrier initialized with 4 participants, each + * participant works on phase A, then calls BarrierArriveAndWait to wait until + * all 4 participants have reached that point. When BarrierArriveAndWait + * returns control, each participant can work on B, and so on. Because the + * barrier knows how many participants to expect, the phases of computation + * don't need labels or numbers, since each process's program counter implies + * the current phase. Even if some of the processes are slow to start up and + * begin running phase A, the other participants are expecting them and will + * patiently wait at the barrier. The code could be written as follows: + * + * perform_a(); + * BarrierArriveAndWait(&barrier, ...); + * perform_b(); + * BarrierArriveAndWait(&barrier, ...); + * perform_c(); + * BarrierArriveAndWait(&barrier, ...); + * + * If the number of participants is not known up front, then a dynamic barrier + * is needed and the number should be set to zero at initialization. New + * complications arise because the number necessarily changes over time as + * participants attach and detach, and therefore phases B, C or even the end + * of processing may be reached before any given participant has started + * running and attached. Therefore the client code must perform an initial + * test of the phase number after attaching, because it needs to find out + * which phase of the algorithm has been reached by any participants that are + * already attached in order to synchronize with that work. Once the program + * counter or some other representation of current progress is synchronized + * with the barrier's phase, normal control flow can be used just as in the + * static case. Our example could be written using a switch statement with + * cases that fall-through, as follows: + * + * phase = BarrierAttach(&barrier); + * switch (phase) + * { + * case PHASE_A: + * perform_a(); + * BarrierArriveAndWait(&barrier, ...); + * case PHASE_B: + * perform_b(); + * BarrierArriveAndWait(&barrier, ...); + * case PHASE_C: + * perform_c(); + * BarrierArriveAndWait(&barrier, ...); + * } + * BarrierDetach(&barrier); + * + * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic + * barriers behave similarly to Java's java.util.concurrent.Phaser. + * + * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) + * + * IDENTIFICATION + * src/backend/storage/ipc/barrier.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "storage/barrier.h" + +static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive); + +/* + * Initialize this barrier. To use a static party size, provide the number of + * participants to wait for at each phase indicating that that number of + * backends is implicitly attached. To use a dynamic party size, specify zero + * here and then use BarrierAttach() and + * BarrierDetach()/BarrierArriveAndDetach() to register and deregister + * participants explicitly. + */ +void +BarrierInit(Barrier *barrier, int participants) +{ + SpinLockInit(&barrier->mutex); + barrier->participants = participants; + barrier->arrived = 0; + barrier->phase = 0; + barrier->elected = 0; + barrier->static_party = participants > 0; + ConditionVariableInit(&barrier->condition_variable); +} + +/* + * Arrive at this barrier, wait for all other attached participants to arrive + * too and then return. Increments the current phase. The caller must be + * attached. + * + * While waiting, pg_stat_activity shows a wait_event_class and wait_event + * controlled by the wait_event_info passed in, which should be a value from + * from one of the WaitEventXXX enums defined in pgstat.h. + * + * Return true in one arbitrarily chosen participant. Return false in all + * others. The return code can be used to elect one participant to execute a + * phase of work that must be done serially while other participants wait. + */ +bool +BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info) +{ + bool release = false; + bool elected; + int start_phase; + int next_phase; + + SpinLockAcquire(&barrier->mutex); + start_phase = barrier->phase; + next_phase = start_phase + 1; + ++barrier->arrived; + if (barrier->arrived == barrier->participants) + { + release = true; + barrier->arrived = 0; + barrier->phase = next_phase; + barrier->elected = next_phase; + } + SpinLockRelease(&barrier->mutex); + + /* + * If we were the last expected participant to arrive, we can release our + * peers and return true to indicate that this backend has been elected to + * perform any serial work. + */ + if (release) + { + ConditionVariableBroadcast(&barrier->condition_variable); + + return true; + } + + /* + * Otherwise we have to wait for the last participant to arrive and + * advance the phase. + */ + elected = false; + ConditionVariablePrepareToSleep(&barrier->condition_variable); + for (;;) + { + /* + * We know that phase must either be start_phase, indicating that we + * need to keep waiting, or next_phase, indicating that the last + * participant that we were waiting for has either arrived or detached + * so that the next phase has begun. The phase cannot advance any + * further than that without this backend's participation, because + * this backend is attached. + */ + SpinLockAcquire(&barrier->mutex); + Assert(barrier->phase == start_phase || barrier->phase == next_phase); + release = barrier->phase == next_phase; + if (release && barrier->elected != next_phase) + { + /* + * Usually the backend that arrives last and releases the other + * backends is elected to return true (see above), so that it can + * begin processing serial work while it has a CPU timeslice. + * However, if the barrier advanced because someone detached, then + * one of the backends that is awoken will need to be elected. + */ + barrier->elected = barrier->phase; + elected = true; + } + SpinLockRelease(&barrier->mutex); + if (release) + break; + ConditionVariableSleep(&barrier->condition_variable, wait_event_info); + } + ConditionVariableCancelSleep(); + + return elected; +} + +/* + * Arrive at this barrier, but detach rather than waiting. Returns true if + * the caller was the last to detach. + */ +bool +BarrierArriveAndDetach(Barrier *barrier) +{ + return BarrierDetachImpl(barrier, true); +} + +/* + * Attach to a barrier. All waiting participants will now wait for this + * participant to call BarrierArriveAndWait(), BarrierDetach() or + * BarrierArriveAndDetach(). Return the current phase. + */ +int +BarrierAttach(Barrier *barrier) +{ + int phase; + + Assert(!barrier->static_party); + + SpinLockAcquire(&barrier->mutex); + ++barrier->participants; + phase = barrier->phase; + SpinLockRelease(&barrier->mutex); + + return phase; +} + +/* + * Detach from a barrier. This may release other waiters from BarrierWait and + * advance the phase if they were only waiting for this backend. Return true + * if this participant was the last to detach. + */ +bool +BarrierDetach(Barrier *barrier) +{ + return BarrierDetachImpl(barrier, false); +} + +/* + * Return the current phase of a barrier. The caller must be attached. + */ +int +BarrierPhase(Barrier *barrier) +{ + /* + * It is OK to read barrier->phase without locking, because it can't + * change without us (we are attached to it), and we executed a memory + * barrier when we either attached or participated in changing it last + * time. + */ + return barrier->phase; +} + +/* + * Return an instantaneous snapshot of the number of participants currently + * attached to this barrier. For debugging purposes only. + */ +int +BarrierParticipants(Barrier *barrier) +{ + int participants; + + SpinLockAcquire(&barrier->mutex); + participants = barrier->participants; + SpinLockRelease(&barrier->mutex); + + return participants; +} + +/* + * Detach from a barrier. If 'arrive' is true then also increment the phase + * if there are no other participants. If there are other participants + * waiting, then the phase will be advanced and they'll be released if they + * were only waiting for the caller. Return true if this participant was the + * last to detach. + */ +static inline bool +BarrierDetachImpl(Barrier *barrier, bool arrive) +{ + bool release; + bool last; + + Assert(!barrier->static_party); + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->participants > 0); + --barrier->participants; + + /* + * If any other participants are waiting and we were the last participant + * waited for, release them. If no other participants are waiting, but + * this is a BarrierArriveAndDetach() call, then advance the phase too. + */ + if ((arrive || barrier->participants > 0) && + barrier->arrived == barrier->participants) + { + release = true; + barrier->arrived = 0; + ++barrier->phase; + } + else + release = false; + + last = barrier->participants == 0; + SpinLockRelease(&barrier->mutex); + + if (release) + ConditionVariableBroadcast(&barrier->condition_variable); + + return last; +} diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h new file mode 100644 index 0000000000..0abaa2b98b --- /dev/null +++ b/src/include/storage/barrier.h @@ -0,0 +1,45 @@ +/*------------------------------------------------------------------------- + * + * barrier.h + * Barriers for synchronizing cooperating processes. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/barrier.h + * + *------------------------------------------------------------------------- + */ +#ifndef BARRIER_H +#define BARRIER_H + +/* + * For the header previously known as "barrier.h", please include + * "port/atomics.h", which deals with atomics, compiler barriers and memory + * barriers. + */ + +#include "storage/condition_variable.h" +#include "storage/spin.h" + +typedef struct Barrier +{ + slock_t mutex; + int phase; /* phase counter */ + int participants; /* the number of participants attached */ + int arrived; /* the number of participants that have + * arrived */ + int elected; /* highest phase elected */ + bool static_party; /* used only for assertions */ + ConditionVariable condition_variable; +} Barrier; + +extern void BarrierInit(Barrier *barrier, int num_workers); +extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info); +extern bool BarrierArriveAndDetach(Barrier *barrier); +extern int BarrierAttach(Barrier *barrier); +extern bool BarrierDetach(Barrier *barrier); +extern int BarrierPhase(Barrier *barrier); +extern int BarrierParticipants(Barrier *barrier); + +#endif /* BARRIER_H */