1 /*-------------------------------------------------------------------------
4 * Logical replication progress tracking support.
6 * Copyright (c) 2013-2015, PostgreSQL Global Development Group
9 * src/backend/replication/logical/origin.c
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationStates) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
45 * There are several levels of locking at work:
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
51 * * When creating an in-memory replication progress slot the ReplicationOirgin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (c.f. AdvanceReplicationProgress).
65 * ---------------------------------------------------------------------------
74 #include "miscadmin.h"
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
81 #include "catalog/indexing.h"
83 #include "nodes/execnodes.h"
85 #include "replication/origin.h"
86 #include "replication/logical.h"
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
101 * Replay progress of a single remote node.
103 typedef struct ReplicationState
106 * Local identifier for the remote node.
111 * Location of the latest commit from the remote side.
113 XLogRecPtr remote_lsn;
116 * Remember the local lsn of the commit record so we can XLogFlush() to it
117 * during a checkpoint so we know the commit record actually is safe on
120 XLogRecPtr local_lsn;
123 * Slot is setup in backend?
128 * Lock protecting remote_lsn and local_lsn.
134 * On disk version of ReplicationState.
136 typedef struct ReplicationStateOnDisk
139 XLogRecPtr remote_lsn;
140 } ReplicationStateOnDisk;
143 typedef struct ReplicationStateCtl
146 LWLockTranche tranche;
147 ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
148 } ReplicationStateCtl;
150 /* external variables */
151 RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
152 XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
153 TimestampTz replorigin_session_origin_timestamp = 0;
156 * Base address into a shared memory array of replication states of size
157 * max_replication_slots.
159 * XXX: Should we use a separate variable to size this rather than
160 * max_replication_slots?
162 static ReplicationState *replication_states;
163 static ReplicationStateCtl *replication_states_ctl;
166 * Backend-local, cached element from ReplicationStates for use in a backend
167 * replaying remote commits, so we don't have to search ReplicationStates for
168 * the backends current RepOriginId.
170 static ReplicationState *session_replication_state = NULL;
172 /* Magic for on disk files. */
173 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
176 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
180 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
181 errmsg("only superusers can query or manipulate replication origins")));
183 if (check_slots && max_replication_slots == 0)
185 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
186 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
188 if (!recoveryOK && RecoveryInProgress())
190 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
191 errmsg("cannot manipulate replication origins during recovery")));
196 /* ---------------------------------------------------------------------------
197 * Functions for working with replication origins themselves.
198 * ---------------------------------------------------------------------------
202 * Check for a persistent replication origin identified by name.
204 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
207 replorigin_by_name(char *roname, bool missing_ok)
209 Form_pg_replication_origin ident;
210 Oid roident = InvalidOid;
214 roname_d = CStringGetTextDatum(roname);
216 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
217 if (HeapTupleIsValid(tuple))
219 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
220 roident = ident->roident;
221 ReleaseSysCache(tuple);
223 else if (!missing_ok)
224 elog(ERROR, "cache lookup failed for replication origin '%s'",
231 * Create a replication origin.
233 * Needs to be called in a transaction.
236 replorigin_create(char *roname)
239 HeapTuple tuple = NULL;
242 SnapshotData SnapshotDirty;
246 roname_d = CStringGetTextDatum(roname);
248 Assert(IsTransactionState());
251 * We need the numeric replication origin to be 16bit wide, so we cannot
252 * rely on the normal oid allocation. Instead we simply scan
253 * pg_replication_origin for the first unused id. That's not particularly
254 * efficient, but this should be a fairly infrequent operation - we can
255 * easily spend a bit more code on this when it turns out it needs to be
258 * We handle concurrency by taking an exclusive lock (allowing reads!)
259 * over the table for the duration of the search. Because we use a "dirty
260 * snapshot" we can read rows that other in-progress sessions have
261 * written, even though they would be invisible with normal snapshots. Due
262 * to the exclusive lock there's no danger that new rows can appear while
265 InitDirtySnapshot(SnapshotDirty);
267 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
269 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
271 bool nulls[Natts_pg_replication_origin];
272 Datum values[Natts_pg_replication_origin];
275 CHECK_FOR_INTERRUPTS();
278 Anum_pg_replication_origin_roident,
279 BTEqualStrategyNumber, F_OIDEQ,
280 ObjectIdGetDatum(roident));
282 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
287 collides = HeapTupleIsValid(systable_getnext(scan));
289 systable_endscan(scan);
294 * Ok, found an unused roident, insert the new row and do a CCI,
295 * so our callers can look it up if they want to.
297 memset(&nulls, 0, sizeof(nulls));
299 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
300 values[Anum_pg_replication_origin_roname - 1] = roname_d;
302 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
303 simple_heap_insert(rel, tuple);
304 CatalogUpdateIndexes(rel, tuple);
305 CommandCounterIncrement();
310 /* now release lock again, */
311 heap_close(rel, ExclusiveLock);
315 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
316 errmsg("no free replication origin oid could be found")));
318 heap_freetuple(tuple);
324 * Drop replication origin.
326 * Needs to be called in a transaction.
329 replorigin_drop(RepOriginId roident)
331 HeapTuple tuple = NULL;
335 Assert(IsTransactionState());
337 rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
339 /* cleanup the slot state info */
340 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
342 for (i = 0; i < max_replication_slots; i++)
344 ReplicationState *state = &replication_states[i];
347 if (state->roident == roident)
349 if (state->acquired_by != 0)
352 (errcode(ERRCODE_OBJECT_IN_USE),
353 errmsg("cannot drop replication origin with oid %d, in use by pid %d",
355 state->acquired_by)));
360 xl_replorigin_drop xlrec;
362 xlrec.node_id = roident;
364 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
365 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
368 /* then reset the in-memory entry */
369 state->roident = InvalidRepOriginId;
370 state->remote_lsn = InvalidXLogRecPtr;
371 state->local_lsn = InvalidXLogRecPtr;
375 LWLockRelease(ReplicationOriginLock);
377 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
378 if (!HeapTupleIsValid(tuple))
379 elog(ERROR, "cache lookup failed for replication origin with oid %u",
382 simple_heap_delete(rel, &tuple->t_self);
383 ReleaseSysCache(tuple);
385 CommandCounterIncrement();
387 /* now release lock again, */
388 heap_close(rel, ExclusiveLock);
393 * Lookup replication origin via it's oid and return the name.
395 * The external name is palloc'd in the calling context.
397 * Returns true if the origin is known, false otherwise.
400 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
403 Form_pg_replication_origin ric;
405 Assert(OidIsValid((Oid) roident));
406 Assert(roident != InvalidRepOriginId);
407 Assert(roident != DoNotReplicateId);
409 tuple = SearchSysCache1(REPLORIGIDENT,
410 ObjectIdGetDatum((Oid) roident));
412 if (HeapTupleIsValid(tuple))
414 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
415 *roname = text_to_cstring(&ric->roname);
416 ReleaseSysCache(tuple);
425 elog(ERROR, "cache lookup failed for replication origin with oid %u",
433 /* ---------------------------------------------------------------------------
434 * Functions for handling replication progress.
435 * ---------------------------------------------------------------------------
439 ReplicationOriginShmemSize(void)
444 * XXX: max_replication_slots is arguably the wrong thing to use, as here
445 * we keep the replay state of *remote* transactions. But for now it seems
446 * sufficient to reuse it, lest we introduce a separate guc.
448 if (max_replication_slots == 0)
451 size = add_size(size, offsetof(ReplicationStateCtl, states));
453 size = add_size(size,
454 mul_size(max_replication_slots, sizeof(ReplicationState)));
459 ReplicationOriginShmemInit(void)
463 if (max_replication_slots == 0)
466 replication_states_ctl = (ReplicationStateCtl *)
467 ShmemInitStruct("ReplicationOriginState",
468 ReplicationOriginShmemSize(),
470 replication_states = replication_states_ctl->states;
476 replication_states_ctl->tranche_id = LWLockNewTrancheId();
477 replication_states_ctl->tranche.name = "ReplicationOrigins";
478 replication_states_ctl->tranche.array_base =
479 &replication_states[0].lock;
480 replication_states_ctl->tranche.array_stride =
481 sizeof(ReplicationState);
483 MemSet(replication_states, 0, ReplicationOriginShmemSize());
485 for (i = 0; i < max_replication_slots; i++)
486 LWLockInitialize(&replication_states[i].lock,
487 replication_states_ctl->tranche_id);
490 LWLockRegisterTranche(replication_states_ctl->tranche_id,
491 &replication_states_ctl->tranche);
494 /* ---------------------------------------------------------------------------
495 * Perform a checkpoint of each replication origin's progress with respect to
496 * the replayed remote_lsn. Make sure that all transactions we refer to in the
497 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
498 * if the transactions were originally committed asynchronously.
500 * We store checkpoints in the following format:
501 * +-------+------------------------+------------------+-----+--------+
502 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
503 * +-------+------------------------+------------------+-----+--------+
505 * So its just the magic, followed by the statically sized
506 * ReplicationStateOnDisk structs. Note that the maximum number of
507 * ReplicationStates is determined by max_replication_slots.
508 * ---------------------------------------------------------------------------
511 CheckPointReplicationOrigin(void)
513 const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
514 const char *path = "pg_logical/replorigin_checkpoint";
517 uint32 magic = REPLICATION_STATE_MAGIC;
520 if (max_replication_slots == 0)
525 /* make sure no old temp file is remaining */
526 if (unlink(tmppath) < 0 && errno != ENOENT)
528 (errcode_for_file_access(),
529 errmsg("could not remove file \"%s\": %m",
533 * no other backend can perform this at the same time, we're protected by
536 tmpfd = OpenTransientFile((char *) tmppath,
537 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
541 (errcode_for_file_access(),
542 errmsg("could not create file \"%s\": %m",
546 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
548 CloseTransientFile(tmpfd);
550 (errcode_for_file_access(),
551 errmsg("could not write to file \"%s\": %m",
554 COMP_CRC32C(crc, &magic, sizeof(magic));
556 /* prevent concurrent creations/drops */
557 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
559 /* write actual data */
560 for (i = 0; i < max_replication_slots; i++)
562 ReplicationStateOnDisk disk_state;
563 ReplicationState *curstate = &replication_states[i];
564 XLogRecPtr local_lsn;
566 if (curstate->roident == InvalidRepOriginId)
569 LWLockAcquire(&curstate->lock, LW_SHARED);
571 disk_state.roident = curstate->roident;
573 disk_state.remote_lsn = curstate->remote_lsn;
574 local_lsn = curstate->local_lsn;
576 LWLockRelease(&curstate->lock);
578 /* make sure we only write out a commit that's persistent */
579 XLogFlush(local_lsn);
581 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
584 CloseTransientFile(tmpfd);
586 (errcode_for_file_access(),
587 errmsg("could not write to file \"%s\": %m",
591 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
594 LWLockRelease(ReplicationOriginLock);
596 /* write out the CRC */
598 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
600 CloseTransientFile(tmpfd);
602 (errcode_for_file_access(),
603 errmsg("could not write to file \"%s\": %m",
607 /* fsync the temporary file */
608 if (pg_fsync(tmpfd) != 0)
610 CloseTransientFile(tmpfd);
612 (errcode_for_file_access(),
613 errmsg("could not fsync file \"%s\": %m",
617 CloseTransientFile(tmpfd);
619 /* rename to permanent file, fsync file and directory */
620 if (rename(tmppath, path) != 0)
623 (errcode_for_file_access(),
624 errmsg("could not rename file \"%s\" to \"%s\": %m",
628 fsync_fname((char *) path, false);
629 fsync_fname("pg_logical", true);
633 * Recover replication replay status from checkpoint data saved earlier by
634 * CheckPointReplicationOrigin.
636 * This only needs to be called at startup and *not* during every checkpoint
637 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
638 * state thereafter can be recovered by looking at commit records.
641 StartupReplicationOrigin(void)
643 const char *path = "pg_logical/replorigin_checkpoint";
646 uint32 magic = REPLICATION_STATE_MAGIC;
651 /* don't want to overwrite already existing state */
652 #ifdef USE_ASSERT_CHECKING
653 static bool already_started = false;
655 Assert(!already_started);
656 already_started = true;
659 if (max_replication_slots == 0)
664 elog(DEBUG2, "starting up replication origin progress state");
666 fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
669 * might have had max_replication_slots == 0 last run, or we just brought
672 if (fd < 0 && errno == ENOENT)
676 (errcode_for_file_access(),
677 errmsg("could not open file \"%s\": %m",
680 /* verify magic, thats written even if nothing was active */
681 readBytes = read(fd, &magic, sizeof(magic));
682 if (readBytes != sizeof(magic))
684 (errmsg("could not read file \"%s\": %m",
686 COMP_CRC32C(crc, &magic, sizeof(magic));
688 if (magic != REPLICATION_STATE_MAGIC)
690 (errmsg("replication checkpoint has wrong magic %u instead of %u",
691 magic, REPLICATION_STATE_MAGIC)));
693 /* we can skip locking here, no other access is possible */
695 /* recover individual states, until there are no more to be found */
698 ReplicationStateOnDisk disk_state;
700 readBytes = read(fd, &disk_state, sizeof(disk_state));
702 /* no further data */
703 if (readBytes == sizeof(crc))
705 /* not pretty, but simple ... */
706 file_crc = *(pg_crc32c *) &disk_state;
713 (errcode_for_file_access(),
714 errmsg("could not read file \"%s\": %m",
718 if (readBytes != sizeof(disk_state))
721 (errcode_for_file_access(),
722 errmsg("could not read file \"%s\": read %d of %zu",
723 path, readBytes, sizeof(disk_state))));
726 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
728 if (last_state == max_replication_slots)
730 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
731 errmsg("no free replication state could be found, increase max_replication_slots")));
733 /* copy data to shared memory */
734 replication_states[last_state].roident = disk_state.roident;
735 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
738 elog(LOG, "recovered replication state of node %u to %X/%X",
740 (uint32) (disk_state.remote_lsn >> 32),
741 (uint32) disk_state.remote_lsn);
744 /* now check checksum */
748 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
749 errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
752 CloseTransientFile(fd);
756 replorigin_redo(XLogReaderState *record)
758 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
762 case XLOG_REPLORIGIN_SET:
764 xl_replorigin_set *xlrec =
765 (xl_replorigin_set *) XLogRecGetData(record);
767 replorigin_advance(xlrec->node_id,
768 xlrec->remote_lsn, record->EndRecPtr,
769 xlrec->force /* backward */ ,
770 false /* WAL log */ );
773 case XLOG_REPLORIGIN_DROP:
775 xl_replorigin_drop *xlrec;
778 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
780 for (i = 0; i < max_replication_slots; i++)
782 ReplicationState *state = &replication_states[i];
785 if (state->roident == xlrec->node_id)
788 state->roident = InvalidRepOriginId;
789 state->remote_lsn = InvalidXLogRecPtr;
790 state->local_lsn = InvalidXLogRecPtr;
797 elog(PANIC, "replorigin_redo: unknown op code %u", info);
803 * Tell the replication origin progress machinery that a commit from 'node'
804 * that originated at the LSN remote_commit on the remote node was replayed
805 * successfully and that we don't need to do so again. In combination with
806 * setting up replorigin_session_origin_lsn and replorigin_session_origin
807 * that ensures we won't loose knowledge about that after a crash if the
808 * transaction had a persistent effect (think of asynchronous commits).
810 * local_commit needs to be a local LSN of the commit so that we can make sure
811 * upon a checkpoint that enough WAL has been persisted to disk.
813 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
814 * unless running in recovery.
817 replorigin_advance(RepOriginId node,
818 XLogRecPtr remote_commit, XLogRecPtr local_commit,
819 bool go_backward, bool wal_log)
822 ReplicationState *replication_state = NULL;
823 ReplicationState *free_state = NULL;
825 Assert(node != InvalidRepOriginId);
827 /* we don't track DoNotReplicateId */
828 if (node == DoNotReplicateId)
832 * XXX: For the case where this is called by WAL replay, it'd be more
833 * efficient to restore into a backend local hashtable and only dump into
834 * shmem after recovery is finished. Let's wait with implementing that
835 * till it's shown to be a measurable expense
838 /* Lock exclusively, as we may have to create a new table entry. */
839 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
842 * Search for either an existing slot for the origin, or a free one we can
845 for (i = 0; i < max_replication_slots; i++)
847 ReplicationState *curstate = &replication_states[i];
849 /* remember where to insert if necessary */
850 if (curstate->roident == InvalidRepOriginId &&
853 free_state = curstate;
858 if (curstate->roident != node)
864 replication_state = curstate;
866 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
868 /* Make sure it's not used by somebody else */
869 if (replication_state->acquired_by != 0)
872 (errcode(ERRCODE_OBJECT_IN_USE),
873 errmsg("replication origin with oid %d is already active for pid %d",
874 replication_state->roident,
875 replication_state->acquired_by)));
881 if (replication_state == NULL && free_state == NULL)
883 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
884 errmsg("no free replication state slot could be found for replication origin with oid %u",
886 errhint("Increase max_replication_slots and try again.")));
888 if (replication_state == NULL)
890 /* initialize new slot */
891 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
892 replication_state = free_state;
893 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
894 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
895 replication_state->roident = node;
898 Assert(replication_state->roident != InvalidRepOriginId);
901 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
902 * and the standby gets the message. Primarily this will be called during
903 * WAL replay (of commit records) where no WAL logging is necessary.
907 xl_replorigin_set xlrec;
909 xlrec.remote_lsn = remote_commit;
910 xlrec.node_id = node;
911 xlrec.force = go_backward;
914 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
916 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
920 * Due to - harmless - race conditions during a checkpoint we could see
921 * values here that are older than the ones we already have in memory.
922 * Don't overwrite those.
924 if (go_backward || replication_state->remote_lsn < remote_commit)
925 replication_state->remote_lsn = remote_commit;
926 if (local_commit != InvalidXLogRecPtr &&
927 (go_backward || replication_state->local_lsn < local_commit))
928 replication_state->local_lsn = local_commit;
929 LWLockRelease(&replication_state->lock);
932 * Release *after* changing the LSNs, slot isn't acquired and thus could
933 * otherwise be dropped anytime.
935 LWLockRelease(ReplicationOriginLock);
940 replorigin_get_progress(RepOriginId node, bool flush)
943 XLogRecPtr local_lsn = InvalidXLogRecPtr;
944 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
946 /* prevent slots from being concurrently dropped */
947 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
949 for (i = 0; i < max_replication_slots; i++)
951 ReplicationState *state;
953 state = &replication_states[i];
955 if (state->roident == node)
957 LWLockAcquire(&state->lock, LW_SHARED);
959 remote_lsn = state->remote_lsn;
960 local_lsn = state->local_lsn;
962 LWLockRelease(&state->lock);
968 LWLockRelease(ReplicationOriginLock);
970 if (flush && local_lsn != InvalidXLogRecPtr)
971 XLogFlush(local_lsn);
977 * Tear down a (possibly) configured session replication origin during process
981 ReplicationOriginExitCleanup(int code, Datum arg)
983 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
985 if (session_replication_state != NULL &&
986 session_replication_state->acquired_by == MyProcPid)
988 session_replication_state->acquired_by = 0;
989 session_replication_state = NULL;
992 LWLockRelease(ReplicationOriginLock);
996 * Setup a replication origin in the shared memory struct if it doesn't
997 * already exists and cache access to the specific ReplicationSlot so the
998 * array doesn't have to be searched when calling
999 * replorigin_session_advance().
1001 * Obviously only one such cached origin can exist per process and the current
1002 * cached value can only be set again after the previous value is torn down
1003 * with replorigin_session_reset().
1006 replorigin_session_setup(RepOriginId node)
1008 static bool registered_cleanup;
1012 if (!registered_cleanup)
1014 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1015 registered_cleanup = true;
1018 Assert(max_replication_slots > 0);
1020 if (session_replication_state != NULL)
1022 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1023 errmsg("cannot setup replication origin when one is already setup")));
1025 /* Lock exclusively, as we may have to create a new table entry. */
1026 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1029 * Search for either an existing slot for the origin, or a free one we can
1032 for (i = 0; i < max_replication_slots; i++)
1034 ReplicationState *curstate = &replication_states[i];
1036 /* remember where to insert if necessary */
1037 if (curstate->roident == InvalidRepOriginId &&
1045 if (curstate->roident != node)
1048 else if (curstate->acquired_by != 0)
1051 (errcode(ERRCODE_OBJECT_IN_USE),
1052 errmsg("replication identifier %d is already active for pid %d",
1053 curstate->roident, curstate->acquired_by)));
1056 /* ok, found slot */
1057 session_replication_state = curstate;
1061 if (session_replication_state == NULL && free_slot == -1)
1063 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1064 errmsg("no free replication state slot could be found for replication origin with oid %u",
1066 errhint("Increase max_replication_slots and try again.")));
1067 else if (session_replication_state == NULL)
1069 /* initialize new slot */
1070 session_replication_state = &replication_states[free_slot];
1071 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1072 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1073 session_replication_state->roident = node;
1077 Assert(session_replication_state->roident != InvalidRepOriginId);
1079 session_replication_state->acquired_by = MyProcPid;
1081 LWLockRelease(ReplicationOriginLock);
1085 * Reset replay state previously setup in this session.
1087 * This function may only be called if an origin was setup with
1088 * replorigin_session_setup().
1091 replorigin_session_reset(void)
1093 Assert(max_replication_slots != 0);
1095 if (session_replication_state == NULL)
1097 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1098 errmsg("no replication origin is configured")));
1100 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1102 session_replication_state->acquired_by = 0;
1103 session_replication_state = NULL;
1105 LWLockRelease(ReplicationOriginLock);
1109 * Do the same work replorigin_advance() does, just on the session's
1110 * configured origin.
1112 * This is noticeably cheaper than using replorigin_advance().
1115 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1117 Assert(session_replication_state != NULL);
1118 Assert(session_replication_state->roident != InvalidRepOriginId);
1120 LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1121 if (session_replication_state->local_lsn < local_commit)
1122 session_replication_state->local_lsn = local_commit;
1123 if (session_replication_state->remote_lsn < remote_commit)
1124 session_replication_state->remote_lsn = remote_commit;
1125 LWLockRelease(&session_replication_state->lock);
1129 * Ask the machinery about the point up to which we successfully replayed
1130 * changes from an already setup replication origin.
1133 replorigin_session_get_progress(bool flush)
1135 XLogRecPtr remote_lsn;
1136 XLogRecPtr local_lsn;
1138 Assert(session_replication_state != NULL);
1140 LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1141 remote_lsn = session_replication_state->remote_lsn;
1142 local_lsn = session_replication_state->local_lsn;
1143 LWLockRelease(&session_replication_state->lock);
1145 if (flush && local_lsn != InvalidXLogRecPtr)
1146 XLogFlush(local_lsn);
1153 /* ---------------------------------------------------------------------------
1154 * SQL functions for working with replication origin.
1156 * These mostly should be fairly short wrappers around more generic functions.
1157 * ---------------------------------------------------------------------------
1161 * Create replication origin for the passed in name, and return the assigned
1165 pg_replication_origin_create(PG_FUNCTION_ARGS)
1168 RepOriginId roident;
1170 replorigin_check_prerequisites(false, false);
1172 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1173 roident = replorigin_create(name);
1177 PG_RETURN_OID(roident);
1181 * Drop replication origin.
1184 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1187 RepOriginId roident;
1189 replorigin_check_prerequisites(false, false);
1191 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1193 roident = replorigin_by_name(name, false);
1194 Assert(OidIsValid(roident));
1196 replorigin_drop(roident);
1204 * Return oid of a replication origin.
1207 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1210 RepOriginId roident;
1212 replorigin_check_prerequisites(false, false);
1214 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1215 roident = replorigin_by_name(name, true);
1219 if (OidIsValid(roident))
1220 PG_RETURN_OID(roident);
1225 * Setup a replication origin for this session.
1228 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1233 replorigin_check_prerequisites(true, false);
1235 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1236 origin = replorigin_by_name(name, false);
1237 replorigin_session_setup(origin);
1239 replorigin_session_origin = origin;
1247 * Reset previously setup origin in this session
1250 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1252 replorigin_check_prerequisites(true, false);
1254 replorigin_session_reset();
1256 replorigin_session_origin = InvalidRepOriginId;
1257 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1258 replorigin_session_origin_timestamp = 0;
1264 * Has a replication origin been setup for this session.
1267 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1269 replorigin_check_prerequisites(false, false);
1271 PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1276 * Return the replication progress for origin setup in the current session.
1278 * If 'flush' is set to true it is ensured that the returned value corresponds
1279 * to a local transaction that has been flushed. this is useful if asychronous
1280 * commits are used when replaying replicated transactions.
1283 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1285 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1286 bool flush = PG_GETARG_BOOL(0);
1288 replorigin_check_prerequisites(true, false);
1290 if (session_replication_state == NULL)
1292 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1293 errmsg("no replication origin is configured")));
1295 remote_lsn = replorigin_session_get_progress(flush);
1297 if (remote_lsn == InvalidXLogRecPtr)
1300 PG_RETURN_LSN(remote_lsn);
1304 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1306 XLogRecPtr location = PG_GETARG_LSN(0);
1308 replorigin_check_prerequisites(true, false);
1310 if (session_replication_state == NULL)
1312 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1313 errmsg("no replication origin is configured")));
1315 replorigin_session_origin_lsn = location;
1316 replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1322 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1324 replorigin_check_prerequisites(true, false);
1326 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1327 replorigin_session_origin_timestamp = 0;
1334 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1336 text *name = PG_GETARG_TEXT_P(0);
1337 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1340 replorigin_check_prerequisites(true, false);
1342 /* lock to prevent the replication origin from vanishing */
1343 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1345 node = replorigin_by_name(text_to_cstring(name), false);
1348 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1349 * xact hasn't committed yet. This is why this function should be used to
1350 * set up the initial replication state, but not for replay.
1352 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1353 true /* go backward */ , true /* wal log */ );
1355 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1362 * Return the replication progress for an individual replication origin.
1364 * If 'flush' is set to true it is ensured that the returned value corresponds
1365 * to a local transaction that has been flushed. this is useful if asychronous
1366 * commits are used when replaying replicated transactions.
1369 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1373 RepOriginId roident;
1374 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1376 replorigin_check_prerequisites(true, true);
1378 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1379 flush = PG_GETARG_BOOL(1);
1381 roident = replorigin_by_name(name, false);
1382 Assert(OidIsValid(roident));
1384 remote_lsn = replorigin_get_progress(roident, flush);
1386 if (remote_lsn == InvalidXLogRecPtr)
1389 PG_RETURN_LSN(remote_lsn);
1394 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1396 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1398 Tuplestorestate *tupstore;
1399 MemoryContext per_query_ctx;
1400 MemoryContext oldcontext;
1402 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1404 /* we we want to return 0 rows if slot is set to zero */
1405 replorigin_check_prerequisites(false, true);
1407 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1409 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1410 errmsg("set-valued function called in context that cannot accept a set")));
1411 if (!(rsinfo->allowedModes & SFRM_Materialize))
1413 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1414 errmsg("materialize mode required, but it is not allowed in this context")));
1415 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1416 elog(ERROR, "return type must be a row type");
1418 if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1419 elog(ERROR, "wrong function definition");
1421 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1422 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1424 tupstore = tuplestore_begin_heap(true, false, work_mem);
1425 rsinfo->returnMode = SFRM_Materialize;
1426 rsinfo->setResult = tupstore;
1427 rsinfo->setDesc = tupdesc;
1429 MemoryContextSwitchTo(oldcontext);
1432 /* prevent slots from being concurrently dropped */
1433 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1436 * Iterate through all possible replication_states, display if they are
1437 * filled. Note that we do not take any locks, so slightly corrupted/out
1438 * of date values are a possibility.
1440 for (i = 0; i < max_replication_slots; i++)
1442 ReplicationState *state;
1443 Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1444 bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1447 state = &replication_states[i];
1449 /* unused slot, nothing to display */
1450 if (state->roident == InvalidRepOriginId)
1453 memset(values, 0, sizeof(values));
1454 memset(nulls, 1, sizeof(nulls));
1456 values[0] = ObjectIdGetDatum(state->roident);
1460 * We're not preventing the origin to be dropped concurrently, so
1461 * silently accept that it might be gone.
1463 if (replorigin_by_oid(state->roident, true,
1466 values[1] = CStringGetTextDatum(roname);
1470 LWLockAcquire(&state->lock, LW_SHARED);
1472 values[2] = LSNGetDatum(state->remote_lsn);
1475 values[3] = LSNGetDatum(state->local_lsn);
1478 LWLockRelease(&state->lock);
1480 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1483 tuplestore_donestoring(tupstore);
1485 LWLockRelease(ReplicationOriginLock);
1487 #undef REPLICATION_ORIGIN_PROGRESS_COLS