From 8e5c2afa98d9c28c9883c4c42890f3a6dd99ba4e Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Tue, 6 Mar 2018 15:57:20 -0300 Subject: [PATCH] Refrain from duplicating data in reorderbuffers MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit If a walsender exits leaving data in reorderbuffers, the next walsender that tries to decode the same transaction would append its decoded data in the same spill files without truncating it first, which effectively duplicate the data. Avoid that by removing any leftover reorderbuffer spill files when a walsender starts. Backpatch to 9.4; this bug has been there from the very beginning of logical decoding. Author: Craig Ringer, revised by me Reviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada --- .../replication/logical/reorderbuffer.c | 137 +++++++++++------- 1 file changed, 82 insertions(+), 55 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7c8f02234b..93f373c9ef 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -202,6 +202,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferCleanupSerializedTXNs(const char *slotname); +static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, + TransactionId xid, XLogSegNo segno); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -220,7 +223,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t /* - * Allocate a new ReorderBuffer + * Allocate a new ReorderBuffer and clean out any old serialized state from + * prior ReorderBuffer instances for the same slot. */ ReorderBuffer * ReorderBufferAllocate(void) @@ -229,6 +233,8 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; + Assert(MyReplicationSlot != NULL); + /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -265,6 +271,13 @@ ReorderBufferAllocate(void) dlist_init(&buffer->cached_changes); slist_init(&buffer->cached_tuplebufs); + /* + * Ensure there's no stale data from prior uses of this slot, in case some + * prior exit avoided calling ReorderBufferFree. Failure to do this can + * produce duplicated txns, and it's very cheap if there's nothing there. + */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); + return buffer; } @@ -281,6 +294,9 @@ ReorderBufferFree(ReorderBuffer *rb) * memory context. */ MemoryContextDelete(context); + + /* Free disk space used by unconsumed reorder buffers */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); } /* @@ -2117,7 +2133,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; - char path[MAXPGPATH]; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -2144,21 +2159,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) */ if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo)) { - XLogRecPtr recptr; + char path[MAXPGPATH]; if (fd != -1) CloseTransientFile(fd); XLByteToSeg(change->lsn, curOpenSegNo); - XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr); /* * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + curOpenSegNo); /* open segment, create it if necessary */ fd = OpenTransientFile(path, @@ -2168,8 +2181,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (fd < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); + errmsg("could not open file \"%s\": %m", path))); } ReorderBufferSerializeChange(rb, txn, fd, change); @@ -2385,25 +2397,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, if (*fd == -1) { - XLogRecPtr recptr; char path[MAXPGPATH]; /* first time in */ if (*segno == 0) - { XLByteToSeg(txn->first_lsn, *segno); - } Assert(*segno != 0 || dlist_is_empty(&txn->changes)); - XLogSegNoOffsetToRecPtr(*segno, 0, recptr); /* * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + *segno); *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); if (*fd < 0 && errno == ENOENT) @@ -2635,13 +2642,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) for (cur = first; cur <= last; cur++) { char path[MAXPGPATH]; - XLogRecPtr recptr; - - XLogSegNoOffsetToRecPtr(cur, 0, recptr); - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur); if (unlink(path) != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), @@ -2649,6 +2651,63 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) } } +/* + * Remove any leftover serialized reorder buffers from a slot directory after a + * prior crash or decoding session exit. + */ +static void +ReorderBufferCleanupSerializedTXNs(const char *slotname) +{ + DIR *spill_dir; + struct dirent *spill_de; + struct stat statbuf; + char path[MAXPGPATH * 2 + 12]; + + sprintf(path, "pg_replslot/%s", slotname); + + /* we're only handling directories here, skip if it's not ours */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + spill_dir = AllocateDir(path); + while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) + { + /* only look at names that can be ours */ + if (strncmp(spill_de->d_name, "xid", 3) == 0) + { + snprintf(path, sizeof(path), + "pg_replslot/%s/%s", slotname, + spill_de->d_name); + + if (unlink(path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m", + path, slotname))); + } + } + FreeDir(spill_dir); +} + +/* + * Given a replication slot, transaction ID and segment number, fill in the + * corresponding spill file into 'path', which is a caller-owned buffer of size + * at least MAXPGPATH. + */ +static void +ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, + XLogSegNo segno) +{ + XLogRecPtr recptr; + + XLogSegNoOffsetToRecPtr(segno, 0, recptr); + + snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", + NameStr(MyReplicationSlot->data.name), + xid, + (uint32) (recptr >> 32), (uint32) recptr); +} + /* * Delete all data spilled to disk after we've restarted/crashed. It will be * recreated when the respective slots are reused. @@ -2659,15 +2718,9 @@ StartupReorderBuffer(void) DIR *logical_dir; struct dirent *logical_de; - DIR *spill_dir; - struct dirent *spill_de; - logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; - char path[MAXPGPATH * 2 + 12]; - if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; @@ -2680,33 +2733,7 @@ StartupReorderBuffer(void) * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ - sprintf(path, "pg_replslot/%s", logical_de->d_name); - - /* we're only creating directories here, skip if it's not our's */ - if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) - continue; - - spill_dir = AllocateDir(path); - while ((spill_de = ReadDir(spill_dir, path)) != NULL) - { - if (strcmp(spill_de->d_name, ".") == 0 || - strcmp(spill_de->d_name, "..") == 0) - continue; - - /* only look at names that can be ours */ - if (strncmp(spill_de->d_name, "xid", 3) == 0) - { - sprintf(path, "pg_replslot/%s/%s", logical_de->d_name, - spill_de->d_name); - - if (unlink(path) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", - path))); - } - } - FreeDir(spill_dir); + ReorderBufferCleanupSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); } -- 2.40.0