]> granicus.if.org Git - postgresql/commitdiff
Fix and document lock handling for in-memory replication slot data
authorMichael Paquier <michael@paquier.xyz>
Fri, 1 Jun 2018 18:30:55 +0000 (14:30 -0400)
committerMichael Paquier <michael@paquier.xyz>
Sun, 10 Jun 2018 10:39:26 +0000 (19:39 +0900)
While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots, or modules looking directly at slot information.

The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Some of the fixed code exists down to 9.4 where WAL decoding has been
introduced, but as those race conditions are really unlikely going to
happen as those concern code paths for slot and decoding creation, just
fix the problem on HEAD.

Author: Michael Paquier

Discussion: https://postgr.es/m/20180528085747.GA27845@paquier.xyz

src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/include/replication/slot.h

index 139359153821112213d1ca33ed94f7d4658e067e..61588d626f608006196c769ad9807f1d3ac592e9 100644 (file)
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
        xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+       SpinLockAcquire(&slot->mutex);
        slot->effective_catalog_xmin = xmin_horizon;
        slot->data.catalog_xmin = xmin_horizon;
        if (need_full_snapshot)
                slot->effective_xmin = xmin_horizon;
+       SpinLockRelease(&slot->mutex);
 
        ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
        XLogRecPtr      startptr;
+       ReplicationSlot *slot = ctx->slot;
 
        /* Initialize from where to start reading WAL. */
-       startptr = ctx->slot->data.restart_lsn;
+       startptr = slot->data.restart_lsn;
 
        elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-                (uint32) (ctx->slot->data.restart_lsn >> 32),
-                (uint32) ctx->slot->data.restart_lsn);
+                (uint32) (slot->data.restart_lsn >> 32),
+                (uint32) slot->data.restart_lsn);
 
        /* Wait for a consistent starting point */
        for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
                CHECK_FOR_INTERRUPTS();
        }
 
-       ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+       SpinLockAcquire(&slot->mutex);
+       slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+       SpinLockRelease(&slot->mutex);
 }
 
 /*
index 056628fe8e3fc23541e6634ed0b30ba691bc1fa5..79d7a57d677fba75aba913688659148215c44dee 100644 (file)
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
                        XLogRecPtr      flushptr;
 
                        /* start at current insert position */
+                       SpinLockAcquire(&slot->mutex);
                        slot->data.restart_lsn = GetXLogInsertRecPtr();
+                       SpinLockRelease(&slot->mutex);
 
                        /* make sure we have enough information to start */
                        flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
                }
                else
                {
+                       SpinLockAcquire(&slot->mutex);
                        slot->data.restart_lsn = GetRedoRecPtr();
+                       SpinLockRelease(&slot->mutex);
                }
 
                /* prevent WAL removal as fast as possible */
index 76a88c6de78e0ea0dcb021d5bf419f9a45d9d244..7964ae254f4bac0ccab4d9a49f3c8fa8c44764ed 100644 (file)
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use flag is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot.  The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
  */
 typedef struct ReplicationSlot
 {