static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
+static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
+ TransactionId xid, XLogSegNo segno);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
/*
- * Allocate a new ReorderBuffer
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
+ * prior ReorderBuffer instances for the same slot.
*/
ReorderBuffer *
ReorderBufferAllocate(void)
HASHCTL hash_ctl;
MemoryContext new_ctx;
+ Assert(MyReplicationSlot != NULL);
+
/* allocate memory in own context, to have better accountability */
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
"ReorderBuffer",
dlist_init(&buffer->cached_changes);
slist_init(&buffer->cached_tuplebufs);
+ /*
+ * Ensure there's no stale data from prior uses of this slot, in case some
+ * prior exit avoided calling ReorderBufferFree. Failure to do this can
+ * produce duplicated txns, and it's very cheap if there's nothing there.
+ */
+ ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
+
return buffer;
}
* memory context.
*/
MemoryContextDelete(context);
+
+ /* Free disk space used by unconsumed reorder buffers */
+ ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
}
/*
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
- char path[MAXPGPATH];
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
*/
if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
{
- XLogRecPtr recptr;
+ char path[MAXPGPATH];
if (fd != -1)
CloseTransientFile(fd);
XLByteToSeg(change->lsn, curOpenSegNo);
- XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
- sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
- NameStr(MyReplicationSlot->data.name), txn->xid,
- (uint32) (recptr >> 32), (uint32) recptr);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
+ curOpenSegNo);
/* open segment, create it if necessary */
fd = OpenTransientFile(path,
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
+ errmsg("could not open file \"%s\": %m", path)));
}
ReorderBufferSerializeChange(rb, txn, fd, change);
if (*fd == -1)
{
- XLogRecPtr recptr;
char path[MAXPGPATH];
/* first time in */
if (*segno == 0)
- {
XLByteToSeg(txn->first_lsn, *segno);
- }
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
- XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
/*
* No need to care about TLIs here, only used during a single run,
* so each LSN only maps to a specific WAL record.
*/
- sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
- NameStr(MyReplicationSlot->data.name), txn->xid,
- (uint32) (recptr >> 32), (uint32) recptr);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
+ *segno);
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
if (*fd < 0 && errno == ENOENT)
for (cur = first; cur <= last; cur++)
{
char path[MAXPGPATH];
- XLogRecPtr recptr;
-
- XLogSegNoOffsetToRecPtr(cur, 0, recptr);
- sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
- NameStr(MyReplicationSlot->data.name), txn->xid,
- (uint32) (recptr >> 32), (uint32) recptr);
+ ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
if (unlink(path) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
}
}
+/*
+ * Remove any leftover serialized reorder buffers from a slot directory after a
+ * prior crash or decoding session exit.
+ */
+static void
+ReorderBufferCleanupSerializedTXNs(const char *slotname)
+{
+ DIR *spill_dir;
+ struct dirent *spill_de;
+ struct stat statbuf;
+ char path[MAXPGPATH * 2 + 12];
+
+ sprintf(path, "pg_replslot/%s", slotname);
+
+ /* we're only handling directories here, skip if it's not ours */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ return;
+
+ spill_dir = AllocateDir(path);
+ while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
+ {
+ /* only look at names that can be ours */
+ if (strncmp(spill_de->d_name, "xid", 3) == 0)
+ {
+ snprintf(path, sizeof(path),
+ "pg_replslot/%s/%s", slotname,
+ spill_de->d_name);
+
+ if (unlink(path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
+ path, slotname)));
+ }
+ }
+ FreeDir(spill_dir);
+}
+
+/*
+ * Given a replication slot, transaction ID and segment number, fill in the
+ * corresponding spill file into 'path', which is a caller-owned buffer of size
+ * at least MAXPGPATH.
+ */
+static void
+ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
+ XLogSegNo segno)
+{
+ XLogRecPtr recptr;
+
+ XLogSegNoOffsetToRecPtr(segno, 0, recptr);
+
+ snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
+ NameStr(MyReplicationSlot->data.name),
+ xid,
+ (uint32) (recptr >> 32), (uint32) recptr);
+}
+
/*
* Delete all data spilled to disk after we've restarted/crashed. It will be
* recreated when the respective slots are reused.
DIR *logical_dir;
struct dirent *logical_de;
- DIR *spill_dir;
- struct dirent *spill_de;
-
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{
- struct stat statbuf;
- char path[MAXPGPATH * 2 + 12];
-
if (strcmp(logical_de->d_name, ".") == 0 ||
strcmp(logical_de->d_name, "..") == 0)
continue;
* ok, has to be a surviving logical slot, iterate and delete
* everything starting with xid-*
*/
- sprintf(path, "pg_replslot/%s", logical_de->d_name);
-
- /* we're only creating directories here, skip if it's not our's */
- if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
- continue;
-
- spill_dir = AllocateDir(path);
- while ((spill_de = ReadDir(spill_dir, path)) != NULL)
- {
- if (strcmp(spill_de->d_name, ".") == 0 ||
- strcmp(spill_de->d_name, "..") == 0)
- continue;
-
- /* only look at names that can be ours */
- if (strncmp(spill_de->d_name, "xid", 3) == 0)
- {
- sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
- spill_de->d_name);
-
- if (unlink(path) != 0)
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not remove file \"%s\": %m",
- path)));
- }
- }
- FreeDir(spill_dir);
+ ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
}
FreeDir(logical_dir);
}