]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/origin.c
Fix "sesssion" typo
[postgresql] / src / backend / replication / logical / origin.c
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  *        Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2015, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *        src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  *       efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient.  For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationStates) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently.  For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  *       pg_replication_slot is required for the duration. That allows us to
49  *       safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOirgin
52  *       LWLock has to be held exclusively; when iterating over the replication
53  *       progress a shared lock has to be held, the same when advancing the
54  *       replication progress of an individual backend that has not setup as the
55  *       session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  *       replication progress slot that slot's lwlock has to be held. That's
59  *       primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  *       all our platforms, but it also simplifies memory ordering concerns
61  *       between the remote and local lsn. We use a lwlock instead of a spinlock
62  *       so it's less harmful to hold the lock over a WAL write
63  *       (c.f. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67
68 #include "postgres.h"
69
70 #include <unistd.h>
71 #include <sys/stat.h>
72
73 #include "funcapi.h"
74 #include "miscadmin.h"
75
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80
81 #include "catalog/indexing.h"
82
83 #include "nodes/execnodes.h"
84
85 #include "replication/origin.h"
86 #include "replication/logical.h"
87
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
92
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99
100 /*
101  * Replay progress of a single remote node.
102  */
103 typedef struct ReplicationState
104 {
105         /*
106          * Local identifier for the remote node.
107          */
108         RepOriginId roident;
109
110         /*
111          * Location of the latest commit from the remote side.
112          */
113         XLogRecPtr      remote_lsn;
114
115         /*
116          * Remember the local lsn of the commit record so we can XLogFlush() to it
117          * during a checkpoint so we know the commit record actually is safe on
118          * disk.
119          */
120         XLogRecPtr      local_lsn;
121
122         /*
123          * Slot is setup in backend?
124          */
125         pid_t           acquired_by;
126
127         /*
128          * Lock protecting remote_lsn and local_lsn.
129          */
130         LWLock          lock;
131 } ReplicationState;
132
133 /*
134  * On disk version of ReplicationState.
135  */
136 typedef struct ReplicationStateOnDisk
137 {
138         RepOriginId roident;
139         XLogRecPtr      remote_lsn;
140 } ReplicationStateOnDisk;
141
142
143 typedef struct ReplicationStateCtl
144 {
145         int                     tranche_id;
146         LWLockTranche tranche;
147         ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
148 } ReplicationStateCtl;
149
150 /* external variables */
151 RepOriginId replorigin_session_origin = InvalidRepOriginId;     /* assumed identity */
152 XLogRecPtr      replorigin_session_origin_lsn = InvalidXLogRecPtr;
153 TimestampTz replorigin_session_origin_timestamp = 0;
154
155 /*
156  * Base address into a shared memory array of replication states of size
157  * max_replication_slots.
158  *
159  * XXX: Should we use a separate variable to size this rather than
160  * max_replication_slots?
161  */
162 static ReplicationState *replication_states;
163 static ReplicationStateCtl *replication_states_ctl;
164
165 /*
166  * Backend-local, cached element from ReplicationStates for use in a backend
167  * replaying remote commits, so we don't have to search ReplicationStates for
168  * the backends current RepOriginId.
169  */
170 static ReplicationState *session_replication_state = NULL;
171
172 /* Magic for on disk files. */
173 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
174
175 static void
176 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
177 {
178         if (!superuser())
179                 ereport(ERROR,
180                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
181                                  errmsg("only superusers can query or manipulate replication origins")));
182
183         if (check_slots && max_replication_slots == 0)
184                 ereport(ERROR,
185                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
186                                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
187
188         if (!recoveryOK && RecoveryInProgress())
189                 ereport(ERROR,
190                                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
191                    errmsg("cannot manipulate replication origins during recovery")));
192
193 }
194
195
196 /* ---------------------------------------------------------------------------
197  * Functions for working with replication origins themselves.
198  * ---------------------------------------------------------------------------
199  */
200
201 /*
202  * Check for a persistent replication origin identified by name.
203  *
204  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
205  */
206 RepOriginId
207 replorigin_by_name(char *roname, bool missing_ok)
208 {
209         Form_pg_replication_origin ident;
210         Oid                     roident = InvalidOid;
211         HeapTuple       tuple;
212         Datum           roname_d;
213
214         roname_d = CStringGetTextDatum(roname);
215
216         tuple = SearchSysCache1(REPLORIGNAME, roname_d);
217         if (HeapTupleIsValid(tuple))
218         {
219                 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
220                 roident = ident->roident;
221                 ReleaseSysCache(tuple);
222         }
223         else if (!missing_ok)
224                 elog(ERROR, "cache lookup failed for replication origin '%s'",
225                          roname);
226
227         return roident;
228 }
229
230 /*
231  * Create a replication origin.
232  *
233  * Needs to be called in a transaction.
234  */
235 RepOriginId
236 replorigin_create(char *roname)
237 {
238         Oid                     roident;
239         HeapTuple       tuple = NULL;
240         Relation        rel;
241         Datum           roname_d;
242         SnapshotData SnapshotDirty;
243         SysScanDesc scan;
244         ScanKeyData key;
245
246         roname_d = CStringGetTextDatum(roname);
247
248         Assert(IsTransactionState());
249
250         /*
251          * We need the numeric replication origin to be 16bit wide, so we cannot
252          * rely on the normal oid allocation. Instead we simply scan
253          * pg_replication_origin for the first unused id. That's not particularly
254          * efficient, but this should be a fairly infrequent operation - we can
255          * easily spend a bit more code on this when it turns out it needs to be
256          * faster.
257          *
258          * We handle concurrency by taking an exclusive lock (allowing reads!)
259          * over the table for the duration of the search. Because we use a "dirty
260          * snapshot" we can read rows that other in-progress sessions have
261          * written, even though they would be invisible with normal snapshots. Due
262          * to the exclusive lock there's no danger that new rows can appear while
263          * we're checking.
264          */
265         InitDirtySnapshot(SnapshotDirty);
266
267         rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
268
269         for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
270         {
271                 bool            nulls[Natts_pg_replication_origin];
272                 Datum           values[Natts_pg_replication_origin];
273                 bool            collides;
274
275                 CHECK_FOR_INTERRUPTS();
276
277                 ScanKeyInit(&key,
278                                         Anum_pg_replication_origin_roident,
279                                         BTEqualStrategyNumber, F_OIDEQ,
280                                         ObjectIdGetDatum(roident));
281
282                 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
283                                                                   true /* indexOK */ ,
284                                                                   &SnapshotDirty,
285                                                                   1, &key);
286
287                 collides = HeapTupleIsValid(systable_getnext(scan));
288
289                 systable_endscan(scan);
290
291                 if (!collides)
292                 {
293                         /*
294                          * Ok, found an unused roident, insert the new row and do a CCI,
295                          * so our callers can look it up if they want to.
296                          */
297                         memset(&nulls, 0, sizeof(nulls));
298
299                         values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
300                         values[Anum_pg_replication_origin_roname - 1] = roname_d;
301
302                         tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
303                         simple_heap_insert(rel, tuple);
304                         CatalogUpdateIndexes(rel, tuple);
305                         CommandCounterIncrement();
306                         break;
307                 }
308         }
309
310         /* now release lock again,      */
311         heap_close(rel, ExclusiveLock);
312
313         if (tuple == NULL)
314                 ereport(ERROR,
315                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
316                                  errmsg("no free replication origin oid could be found")));
317
318         heap_freetuple(tuple);
319         return roident;
320 }
321
322
323 /*
324  * Drop replication origin.
325  *
326  * Needs to be called in a transaction.
327  */
328 void
329 replorigin_drop(RepOriginId roident)
330 {
331         HeapTuple       tuple = NULL;
332         Relation        rel;
333         int                     i;
334
335         Assert(IsTransactionState());
336
337         rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
338
339         /* cleanup the slot state info */
340         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
341
342         for (i = 0; i < max_replication_slots; i++)
343         {
344                 ReplicationState *state = &replication_states[i];
345
346                 /* found our slot */
347                 if (state->roident == roident)
348                 {
349                         if (state->acquired_by != 0)
350                         {
351                                 ereport(ERROR,
352                                                 (errcode(ERRCODE_OBJECT_IN_USE),
353                                                  errmsg("cannot drop replication origin with oid %d, in use by pid %d",
354                                                                 state->roident,
355                                                                 state->acquired_by)));
356                         }
357
358                         /* first WAL log */
359                         {
360                                 xl_replorigin_drop xlrec;
361
362                                 xlrec.node_id = roident;
363                                 XLogBeginInsert();
364                                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
365                                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
366                         }
367
368                         /* then reset the in-memory entry */
369                         state->roident = InvalidRepOriginId;
370                         state->remote_lsn = InvalidXLogRecPtr;
371                         state->local_lsn = InvalidXLogRecPtr;
372                         break;
373                 }
374         }
375         LWLockRelease(ReplicationOriginLock);
376
377         tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
378         if (!HeapTupleIsValid(tuple))
379                 elog(ERROR, "cache lookup failed for replication origin with oid %u",
380                          roident);
381
382         simple_heap_delete(rel, &tuple->t_self);
383         ReleaseSysCache(tuple);
384
385         CommandCounterIncrement();
386
387         /* now release lock again,      */
388         heap_close(rel, ExclusiveLock);
389 }
390
391
392 /*
393  * Lookup replication origin via it's oid and return the name.
394  *
395  * The external name is palloc'd in the calling context.
396  *
397  * Returns true if the origin is known, false otherwise.
398  */
399 bool
400 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
401 {
402         HeapTuple       tuple;
403         Form_pg_replication_origin ric;
404
405         Assert(OidIsValid((Oid) roident));
406         Assert(roident != InvalidRepOriginId);
407         Assert(roident != DoNotReplicateId);
408
409         tuple = SearchSysCache1(REPLORIGIDENT,
410                                                         ObjectIdGetDatum((Oid) roident));
411
412         if (HeapTupleIsValid(tuple))
413         {
414                 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
415                 *roname = text_to_cstring(&ric->roname);
416                 ReleaseSysCache(tuple);
417
418                 return true;
419         }
420         else
421         {
422                 *roname = NULL;
423
424                 if (!missing_ok)
425                         elog(ERROR, "cache lookup failed for replication origin with oid %u",
426                                  roident);
427
428                 return false;
429         }
430 }
431
432
433 /* ---------------------------------------------------------------------------
434  * Functions for handling replication progress.
435  * ---------------------------------------------------------------------------
436  */
437
438 Size
439 ReplicationOriginShmemSize(void)
440 {
441         Size            size = 0;
442
443         /*
444          * XXX: max_replication_slots is arguably the wrong thing to use, as here
445          * we keep the replay state of *remote* transactions. But for now it seems
446          * sufficient to reuse it, lest we introduce a separate guc.
447          */
448         if (max_replication_slots == 0)
449                 return size;
450
451         size = add_size(size, offsetof(ReplicationStateCtl, states));
452
453         size = add_size(size,
454                                   mul_size(max_replication_slots, sizeof(ReplicationState)));
455         return size;
456 }
457
458 void
459 ReplicationOriginShmemInit(void)
460 {
461         bool            found;
462
463         if (max_replication_slots == 0)
464                 return;
465
466         replication_states_ctl = (ReplicationStateCtl *)
467                 ShmemInitStruct("ReplicationOriginState",
468                                                 ReplicationOriginShmemSize(),
469                                                 &found);
470         replication_states = replication_states_ctl->states;
471
472         if (!found)
473         {
474                 int                     i;
475
476                 replication_states_ctl->tranche_id = LWLockNewTrancheId();
477                 replication_states_ctl->tranche.name = "ReplicationOrigins";
478                 replication_states_ctl->tranche.array_base =
479                         &replication_states[0].lock;
480                 replication_states_ctl->tranche.array_stride =
481                         sizeof(ReplicationState);
482
483                 MemSet(replication_states, 0, ReplicationOriginShmemSize());
484
485                 for (i = 0; i < max_replication_slots; i++)
486                         LWLockInitialize(&replication_states[i].lock,
487                                                          replication_states_ctl->tranche_id);
488         }
489
490         LWLockRegisterTranche(replication_states_ctl->tranche_id,
491                                                   &replication_states_ctl->tranche);
492 }
493
494 /* ---------------------------------------------------------------------------
495  * Perform a checkpoint of each replication origin's progress with respect to
496  * the replayed remote_lsn. Make sure that all transactions we refer to in the
497  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
498  * if the transactions were originally committed asynchronously.
499  *
500  * We store checkpoints in the following format:
501  * +-------+------------------------+------------------+-----+--------+
502  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
503  * +-------+------------------------+------------------+-----+--------+
504  *
505  * So its just the magic, followed by the statically sized
506  * ReplicationStateOnDisk structs. Note that the maximum number of
507  * ReplicationStates is determined by max_replication_slots.
508  * ---------------------------------------------------------------------------
509  */
510 void
511 CheckPointReplicationOrigin(void)
512 {
513         const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
514         const char *path = "pg_logical/replorigin_checkpoint";
515         int                     tmpfd;
516         int                     i;
517         uint32          magic = REPLICATION_STATE_MAGIC;
518         pg_crc32c       crc;
519
520         if (max_replication_slots == 0)
521                 return;
522
523         INIT_CRC32C(crc);
524
525         /* make sure no old temp file is remaining */
526         if (unlink(tmppath) < 0 && errno != ENOENT)
527                 ereport(PANIC,
528                                 (errcode_for_file_access(),
529                                  errmsg("could not remove file \"%s\": %m",
530                                                 tmppath)));
531
532         /*
533          * no other backend can perform this at the same time, we're protected by
534          * CheckpointLock.
535          */
536         tmpfd = OpenTransientFile((char *) tmppath,
537                                                           O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
538                                                           S_IRUSR | S_IWUSR);
539         if (tmpfd < 0)
540                 ereport(PANIC,
541                                 (errcode_for_file_access(),
542                                  errmsg("could not create file \"%s\": %m",
543                                                 tmppath)));
544
545         /* write magic */
546         if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
547         {
548                 CloseTransientFile(tmpfd);
549                 ereport(PANIC,
550                                 (errcode_for_file_access(),
551                                  errmsg("could not write to file \"%s\": %m",
552                                                 tmppath)));
553         }
554         COMP_CRC32C(crc, &magic, sizeof(magic));
555
556         /* prevent concurrent creations/drops */
557         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
558
559         /* write actual data */
560         for (i = 0; i < max_replication_slots; i++)
561         {
562                 ReplicationStateOnDisk disk_state;
563                 ReplicationState *curstate = &replication_states[i];
564                 XLogRecPtr      local_lsn;
565
566                 if (curstate->roident == InvalidRepOriginId)
567                         continue;
568
569                 LWLockAcquire(&curstate->lock, LW_SHARED);
570
571                 disk_state.roident = curstate->roident;
572
573                 disk_state.remote_lsn = curstate->remote_lsn;
574                 local_lsn = curstate->local_lsn;
575
576                 LWLockRelease(&curstate->lock);
577
578                 /* make sure we only write out a commit that's persistent */
579                 XLogFlush(local_lsn);
580
581                 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
582                         sizeof(disk_state))
583                 {
584                         CloseTransientFile(tmpfd);
585                         ereport(PANIC,
586                                         (errcode_for_file_access(),
587                                          errmsg("could not write to file \"%s\": %m",
588                                                         tmppath)));
589                 }
590
591                 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
592         }
593
594         LWLockRelease(ReplicationOriginLock);
595
596         /* write out the CRC */
597         FIN_CRC32C(crc);
598         if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
599         {
600                 CloseTransientFile(tmpfd);
601                 ereport(PANIC,
602                                 (errcode_for_file_access(),
603                                  errmsg("could not write to file \"%s\": %m",
604                                                 tmppath)));
605         }
606
607         /* fsync the temporary file */
608         if (pg_fsync(tmpfd) != 0)
609         {
610                 CloseTransientFile(tmpfd);
611                 ereport(PANIC,
612                                 (errcode_for_file_access(),
613                                  errmsg("could not fsync file \"%s\": %m",
614                                                 tmppath)));
615         }
616
617         CloseTransientFile(tmpfd);
618
619         /* rename to permanent file, fsync file and directory */
620         if (rename(tmppath, path) != 0)
621         {
622                 ereport(PANIC,
623                                 (errcode_for_file_access(),
624                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
625                                                 tmppath, path)));
626         }
627
628         fsync_fname((char *) path, false);
629         fsync_fname("pg_logical", true);
630 }
631
632 /*
633  * Recover replication replay status from checkpoint data saved earlier by
634  * CheckPointReplicationOrigin.
635  *
636  * This only needs to be called at startup and *not* during every checkpoint
637  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
638  * state thereafter can be recovered by looking at commit records.
639  */
640 void
641 StartupReplicationOrigin(void)
642 {
643         const char *path = "pg_logical/replorigin_checkpoint";
644         int                     fd;
645         int                     readBytes;
646         uint32          magic = REPLICATION_STATE_MAGIC;
647         int                     last_state = 0;
648         pg_crc32c       file_crc;
649         pg_crc32c       crc;
650
651         /* don't want to overwrite already existing state */
652 #ifdef USE_ASSERT_CHECKING
653         static bool already_started = false;
654
655         Assert(!already_started);
656         already_started = true;
657 #endif
658
659         if (max_replication_slots == 0)
660                 return;
661
662         INIT_CRC32C(crc);
663
664         elog(DEBUG2, "starting up replication origin progress state");
665
666         fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
667
668         /*
669          * might have had max_replication_slots == 0 last run, or we just brought
670          * up a standby.
671          */
672         if (fd < 0 && errno == ENOENT)
673                 return;
674         else if (fd < 0)
675                 ereport(PANIC,
676                                 (errcode_for_file_access(),
677                                  errmsg("could not open file \"%s\": %m",
678                                                 path)));
679
680         /* verify magic, thats written even if nothing was active */
681         readBytes = read(fd, &magic, sizeof(magic));
682         if (readBytes != sizeof(magic))
683                 ereport(PANIC,
684                                 (errmsg("could not read file \"%s\": %m",
685                                                 path)));
686         COMP_CRC32C(crc, &magic, sizeof(magic));
687
688         if (magic != REPLICATION_STATE_MAGIC)
689                 ereport(PANIC,
690                    (errmsg("replication checkpoint has wrong magic %u instead of %u",
691                                    magic, REPLICATION_STATE_MAGIC)));
692
693         /* we can skip locking here, no other access is possible */
694
695         /* recover individual states, until there are no more to be found */
696         while (true)
697         {
698                 ReplicationStateOnDisk disk_state;
699
700                 readBytes = read(fd, &disk_state, sizeof(disk_state));
701
702                 /* no further data */
703                 if (readBytes == sizeof(crc))
704                 {
705                         /* not pretty, but simple ... */
706                         file_crc = *(pg_crc32c *) &disk_state;
707                         break;
708                 }
709
710                 if (readBytes < 0)
711                 {
712                         ereport(PANIC,
713                                         (errcode_for_file_access(),
714                                          errmsg("could not read file \"%s\": %m",
715                                                         path)));
716                 }
717
718                 if (readBytes != sizeof(disk_state))
719                 {
720                         ereport(PANIC,
721                                         (errcode_for_file_access(),
722                                          errmsg("could not read file \"%s\": read %d of %zu",
723                                                         path, readBytes, sizeof(disk_state))));
724                 }
725
726                 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
727
728                 if (last_state == max_replication_slots)
729                         ereport(PANIC,
730                                         (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
731                                          errmsg("no free replication state could be found, increase max_replication_slots")));
732
733                 /* copy data to shared memory */
734                 replication_states[last_state].roident = disk_state.roident;
735                 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
736                 last_state++;
737
738                 elog(LOG, "recovered replication state of node %u to %X/%X",
739                          disk_state.roident,
740                          (uint32) (disk_state.remote_lsn >> 32),
741                          (uint32) disk_state.remote_lsn);
742         }
743
744         /* now check checksum */
745         FIN_CRC32C(crc);
746         if (file_crc != crc)
747                 ereport(PANIC,
748                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
749                                  errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
750                                                 crc, file_crc)));
751
752         CloseTransientFile(fd);
753 }
754
755 void
756 replorigin_redo(XLogReaderState *record)
757 {
758         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
759
760         switch (info)
761         {
762                 case XLOG_REPLORIGIN_SET:
763                         {
764                                 xl_replorigin_set *xlrec =
765                                 (xl_replorigin_set *) XLogRecGetData(record);
766
767                                 replorigin_advance(xlrec->node_id,
768                                                                    xlrec->remote_lsn, record->EndRecPtr,
769                                                                    xlrec->force /* backward */ ,
770                                                                    false /* WAL log */ );
771                                 break;
772                         }
773                 case XLOG_REPLORIGIN_DROP:
774                         {
775                                 xl_replorigin_drop *xlrec;
776                                 int                     i;
777
778                                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
779
780                                 for (i = 0; i < max_replication_slots; i++)
781                                 {
782                                         ReplicationState *state = &replication_states[i];
783
784                                         /* found our slot */
785                                         if (state->roident == xlrec->node_id)
786                                         {
787                                                 /* reset entry */
788                                                 state->roident = InvalidRepOriginId;
789                                                 state->remote_lsn = InvalidXLogRecPtr;
790                                                 state->local_lsn = InvalidXLogRecPtr;
791                                                 break;
792                                         }
793                                 }
794                                 break;
795                         }
796                 default:
797                         elog(PANIC, "replorigin_redo: unknown op code %u", info);
798         }
799 }
800
801
802 /*
803  * Tell the replication origin progress machinery that a commit from 'node'
804  * that originated at the LSN remote_commit on the remote node was replayed
805  * successfully and that we don't need to do so again. In combination with
806  * setting up replorigin_session_origin_lsn and replorigin_session_origin
807  * that ensures we won't loose knowledge about that after a crash if the
808  * transaction had a persistent effect (think of asynchronous commits).
809  *
810  * local_commit needs to be a local LSN of the commit so that we can make sure
811  * upon a checkpoint that enough WAL has been persisted to disk.
812  *
813  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
814  * unless running in recovery.
815  */
816 void
817 replorigin_advance(RepOriginId node,
818                                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
819                                    bool go_backward, bool wal_log)
820 {
821         int                     i;
822         ReplicationState *replication_state = NULL;
823         ReplicationState *free_state = NULL;
824
825         Assert(node != InvalidRepOriginId);
826
827         /* we don't track DoNotReplicateId */
828         if (node == DoNotReplicateId)
829                 return;
830
831         /*
832          * XXX: For the case where this is called by WAL replay, it'd be more
833          * efficient to restore into a backend local hashtable and only dump into
834          * shmem after recovery is finished. Let's wait with implementing that
835          * till it's shown to be a measurable expense
836          */
837
838         /* Lock exclusively, as we may have to create a new table entry. */
839         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
840
841         /*
842          * Search for either an existing slot for the origin, or a free one we can
843          * use.
844          */
845         for (i = 0; i < max_replication_slots; i++)
846         {
847                 ReplicationState *curstate = &replication_states[i];
848
849                 /* remember where to insert if necessary */
850                 if (curstate->roident == InvalidRepOriginId &&
851                         free_state == NULL)
852                 {
853                         free_state = curstate;
854                         continue;
855                 }
856
857                 /* not our slot */
858                 if (curstate->roident != node)
859                 {
860                         continue;
861                 }
862
863                 /* ok, found slot */
864                 replication_state = curstate;
865
866                 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
867
868                 /* Make sure it's not used by somebody else */
869                 if (replication_state->acquired_by != 0)
870                 {
871                         ereport(ERROR,
872                                         (errcode(ERRCODE_OBJECT_IN_USE),
873                                          errmsg("replication origin with oid %d is already active for pid %d",
874                                                         replication_state->roident,
875                                                         replication_state->acquired_by)));
876                 }
877
878                 break;
879         }
880
881         if (replication_state == NULL && free_state == NULL)
882                 ereport(ERROR,
883                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
884                                  errmsg("no free replication state slot could be found for replication origin with oid %u",
885                                                 node),
886                                  errhint("Increase max_replication_slots and try again.")));
887
888         if (replication_state == NULL)
889         {
890                 /* initialize new slot */
891                 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
892                 replication_state = free_state;
893                 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
894                 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
895                 replication_state->roident = node;
896         }
897
898         Assert(replication_state->roident != InvalidRepOriginId);
899
900         /*
901          * If somebody "forcefully" sets this slot, WAL log it, so it's durable
902          * and the standby gets the message. Primarily this will be called during
903          * WAL replay (of commit records) where no WAL logging is necessary.
904          */
905         if (wal_log)
906         {
907                 xl_replorigin_set xlrec;
908
909                 xlrec.remote_lsn = remote_commit;
910                 xlrec.node_id = node;
911                 xlrec.force = go_backward;
912
913                 XLogBeginInsert();
914                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
915
916                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
917         }
918
919         /*
920          * Due to - harmless - race conditions during a checkpoint we could see
921          * values here that are older than the ones we already have in memory.
922          * Don't overwrite those.
923          */
924         if (go_backward || replication_state->remote_lsn < remote_commit)
925                 replication_state->remote_lsn = remote_commit;
926         if (local_commit != InvalidXLogRecPtr &&
927                 (go_backward || replication_state->local_lsn < local_commit))
928                 replication_state->local_lsn = local_commit;
929         LWLockRelease(&replication_state->lock);
930
931         /*
932          * Release *after* changing the LSNs, slot isn't acquired and thus could
933          * otherwise be dropped anytime.
934          */
935         LWLockRelease(ReplicationOriginLock);
936 }
937
938
939 XLogRecPtr
940 replorigin_get_progress(RepOriginId node, bool flush)
941 {
942         int                     i;
943         XLogRecPtr      local_lsn = InvalidXLogRecPtr;
944         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
945
946         /* prevent slots from being concurrently dropped */
947         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
948
949         for (i = 0; i < max_replication_slots; i++)
950         {
951                 ReplicationState *state;
952
953                 state = &replication_states[i];
954
955                 if (state->roident == node)
956                 {
957                         LWLockAcquire(&state->lock, LW_SHARED);
958
959                         remote_lsn = state->remote_lsn;
960                         local_lsn = state->local_lsn;
961
962                         LWLockRelease(&state->lock);
963
964                         break;
965                 }
966         }
967
968         LWLockRelease(ReplicationOriginLock);
969
970         if (flush && local_lsn != InvalidXLogRecPtr)
971                 XLogFlush(local_lsn);
972
973         return remote_lsn;
974 }
975
976 /*
977  * Tear down a (possibly) configured session replication origin during process
978  * exit.
979  */
980 static void
981 ReplicationOriginExitCleanup(int code, Datum arg)
982 {
983         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
984
985         if (session_replication_state != NULL &&
986                 session_replication_state->acquired_by == MyProcPid)
987         {
988                 session_replication_state->acquired_by = 0;
989                 session_replication_state = NULL;
990         }
991
992         LWLockRelease(ReplicationOriginLock);
993 }
994
995 /*
996  * Setup a replication origin in the shared memory struct if it doesn't
997  * already exists and cache access to the specific ReplicationSlot so the
998  * array doesn't have to be searched when calling
999  * replorigin_session_advance().
1000  *
1001  * Obviously only one such cached origin can exist per process and the current
1002  * cached value can only be set again after the previous value is torn down
1003  * with replorigin_session_reset().
1004  */
1005 void
1006 replorigin_session_setup(RepOriginId node)
1007 {
1008         static bool registered_cleanup;
1009         int                     i;
1010         int                     free_slot = -1;
1011
1012         if (!registered_cleanup)
1013         {
1014                 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1015                 registered_cleanup = true;
1016         }
1017
1018         Assert(max_replication_slots > 0);
1019
1020         if (session_replication_state != NULL)
1021                 ereport(ERROR,
1022                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1023                 errmsg("cannot setup replication origin when one is already setup")));
1024
1025         /* Lock exclusively, as we may have to create a new table entry. */
1026         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1027
1028         /*
1029          * Search for either an existing slot for the origin, or a free one we can
1030          * use.
1031          */
1032         for (i = 0; i < max_replication_slots; i++)
1033         {
1034                 ReplicationState *curstate = &replication_states[i];
1035
1036                 /* remember where to insert if necessary */
1037                 if (curstate->roident == InvalidRepOriginId &&
1038                         free_slot == -1)
1039                 {
1040                         free_slot = i;
1041                         continue;
1042                 }
1043
1044                 /* not our slot */
1045                 if (curstate->roident != node)
1046                         continue;
1047
1048                 else if (curstate->acquired_by != 0)
1049                 {
1050                         ereport(ERROR,
1051                                         (errcode(ERRCODE_OBJECT_IN_USE),
1052                          errmsg("replication identifier %d is already active for pid %d",
1053                                         curstate->roident, curstate->acquired_by)));
1054                 }
1055
1056                 /* ok, found slot */
1057                 session_replication_state = curstate;
1058         }
1059
1060
1061         if (session_replication_state == NULL && free_slot == -1)
1062                 ereport(ERROR,
1063                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1064                                  errmsg("no free replication state slot could be found for replication origin with oid %u",
1065                                                 node),
1066                                  errhint("Increase max_replication_slots and try again.")));
1067         else if (session_replication_state == NULL)
1068         {
1069                 /* initialize new slot */
1070                 session_replication_state = &replication_states[free_slot];
1071                 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1072                 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1073                 session_replication_state->roident = node;
1074         }
1075
1076
1077         Assert(session_replication_state->roident != InvalidRepOriginId);
1078
1079         session_replication_state->acquired_by = MyProcPid;
1080
1081         LWLockRelease(ReplicationOriginLock);
1082 }
1083
1084 /*
1085  * Reset replay state previously setup in this session.
1086  *
1087  * This function may only be called if an origin was setup with
1088  * replorigin_session_setup().
1089  */
1090 void
1091 replorigin_session_reset(void)
1092 {
1093         Assert(max_replication_slots != 0);
1094
1095         if (session_replication_state == NULL)
1096                 ereport(ERROR,
1097                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1098                                  errmsg("no replication origin is configured")));
1099
1100         LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1101
1102         session_replication_state->acquired_by = 0;
1103         session_replication_state = NULL;
1104
1105         LWLockRelease(ReplicationOriginLock);
1106 }
1107
1108 /*
1109  * Do the same work replorigin_advance() does, just on the session's
1110  * configured origin.
1111  *
1112  * This is noticeably cheaper than using replorigin_advance().
1113  */
1114 void
1115 replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1116 {
1117         Assert(session_replication_state != NULL);
1118         Assert(session_replication_state->roident != InvalidRepOriginId);
1119
1120         LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1121         if (session_replication_state->local_lsn < local_commit)
1122                 session_replication_state->local_lsn = local_commit;
1123         if (session_replication_state->remote_lsn < remote_commit)
1124                 session_replication_state->remote_lsn = remote_commit;
1125         LWLockRelease(&session_replication_state->lock);
1126 }
1127
1128 /*
1129  * Ask the machinery about the point up to which we successfully replayed
1130  * changes from an already setup replication origin.
1131  */
1132 XLogRecPtr
1133 replorigin_session_get_progress(bool flush)
1134 {
1135         XLogRecPtr      remote_lsn;
1136         XLogRecPtr      local_lsn;
1137
1138         Assert(session_replication_state != NULL);
1139
1140         LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1141         remote_lsn = session_replication_state->remote_lsn;
1142         local_lsn = session_replication_state->local_lsn;
1143         LWLockRelease(&session_replication_state->lock);
1144
1145         if (flush && local_lsn != InvalidXLogRecPtr)
1146                 XLogFlush(local_lsn);
1147
1148         return remote_lsn;
1149 }
1150
1151
1152
1153 /* ---------------------------------------------------------------------------
1154  * SQL functions for working with replication origin.
1155  *
1156  * These mostly should be fairly short wrappers around more generic functions.
1157  * ---------------------------------------------------------------------------
1158  */
1159
1160 /*
1161  * Create replication origin for the passed in name, and return the assigned
1162  * oid.
1163  */
1164 Datum
1165 pg_replication_origin_create(PG_FUNCTION_ARGS)
1166 {
1167         char       *name;
1168         RepOriginId roident;
1169
1170         replorigin_check_prerequisites(false, false);
1171
1172         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1173         roident = replorigin_create(name);
1174
1175         pfree(name);
1176
1177         PG_RETURN_OID(roident);
1178 }
1179
1180 /*
1181  * Drop replication origin.
1182  */
1183 Datum
1184 pg_replication_origin_drop(PG_FUNCTION_ARGS)
1185 {
1186         char       *name;
1187         RepOriginId roident;
1188
1189         replorigin_check_prerequisites(false, false);
1190
1191         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1192
1193         roident = replorigin_by_name(name, false);
1194         Assert(OidIsValid(roident));
1195
1196         replorigin_drop(roident);
1197
1198         pfree(name);
1199
1200         PG_RETURN_VOID();
1201 }
1202
1203 /*
1204  * Return oid of a replication origin.
1205  */
1206 Datum
1207 pg_replication_origin_oid(PG_FUNCTION_ARGS)
1208 {
1209         char       *name;
1210         RepOriginId roident;
1211
1212         replorigin_check_prerequisites(false, false);
1213
1214         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1215         roident = replorigin_by_name(name, true);
1216
1217         pfree(name);
1218
1219         if (OidIsValid(roident))
1220                 PG_RETURN_OID(roident);
1221         PG_RETURN_NULL();
1222 }
1223
1224 /*
1225  * Setup a replication origin for this session.
1226  */
1227 Datum
1228 pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1229 {
1230         char       *name;
1231         RepOriginId origin;
1232
1233         replorigin_check_prerequisites(true, false);
1234
1235         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1236         origin = replorigin_by_name(name, false);
1237         replorigin_session_setup(origin);
1238
1239         replorigin_session_origin = origin;
1240
1241         pfree(name);
1242
1243         PG_RETURN_VOID();
1244 }
1245
1246 /*
1247  * Reset previously setup origin in this session
1248  */
1249 Datum
1250 pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1251 {
1252         replorigin_check_prerequisites(true, false);
1253
1254         replorigin_session_reset();
1255
1256         replorigin_session_origin = InvalidRepOriginId;
1257         replorigin_session_origin_lsn = InvalidXLogRecPtr;
1258         replorigin_session_origin_timestamp = 0;
1259
1260         PG_RETURN_VOID();
1261 }
1262
1263 /*
1264  * Has a replication origin been setup for this session.
1265  */
1266 Datum
1267 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1268 {
1269         replorigin_check_prerequisites(false, false);
1270
1271         PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1272 }
1273
1274
1275 /*
1276  * Return the replication progress for origin setup in the current session.
1277  *
1278  * If 'flush' is set to true it is ensured that the returned value corresponds
1279  * to a local transaction that has been flushed. this is useful if asychronous
1280  * commits are used when replaying replicated transactions.
1281  */
1282 Datum
1283 pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1284 {
1285         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
1286         bool            flush = PG_GETARG_BOOL(0);
1287
1288         replorigin_check_prerequisites(true, false);
1289
1290         if (session_replication_state == NULL)
1291                 ereport(ERROR,
1292                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1293                                  errmsg("no replication origin is configured")));
1294
1295         remote_lsn = replorigin_session_get_progress(flush);
1296
1297         if (remote_lsn == InvalidXLogRecPtr)
1298                 PG_RETURN_NULL();
1299
1300         PG_RETURN_LSN(remote_lsn);
1301 }
1302
1303 Datum
1304 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1305 {
1306         XLogRecPtr      location = PG_GETARG_LSN(0);
1307
1308         replorigin_check_prerequisites(true, false);
1309
1310         if (session_replication_state == NULL)
1311                 ereport(ERROR,
1312                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1313                                  errmsg("no replication origin is configured")));
1314
1315         replorigin_session_origin_lsn = location;
1316         replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1317
1318         PG_RETURN_VOID();
1319 }
1320
1321 Datum
1322 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1323 {
1324         replorigin_check_prerequisites(true, false);
1325
1326         replorigin_session_origin_lsn = InvalidXLogRecPtr;
1327         replorigin_session_origin_timestamp = 0;
1328
1329         PG_RETURN_VOID();
1330 }
1331
1332
1333 Datum
1334 pg_replication_origin_advance(PG_FUNCTION_ARGS)
1335 {
1336         text       *name = PG_GETARG_TEXT_P(0);
1337         XLogRecPtr      remote_commit = PG_GETARG_LSN(1);
1338         RepOriginId node;
1339
1340         replorigin_check_prerequisites(true, false);
1341
1342         /* lock to prevent the replication origin from vanishing */
1343         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1344
1345         node = replorigin_by_name(text_to_cstring(name), false);
1346
1347         /*
1348          * Can't sensibly pass a local commit to be flushed at checkpoint - this
1349          * xact hasn't committed yet. This is why this function should be used to
1350          * set up the initial replication state, but not for replay.
1351          */
1352         replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1353                                            true /* go backward */ , true /* wal log */ );
1354
1355         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1356
1357         PG_RETURN_VOID();
1358 }
1359
1360
1361 /*
1362  * Return the replication progress for an individual replication origin.
1363  *
1364  * If 'flush' is set to true it is ensured that the returned value corresponds
1365  * to a local transaction that has been flushed. this is useful if asychronous
1366  * commits are used when replaying replicated transactions.
1367  */
1368 Datum
1369 pg_replication_origin_progress(PG_FUNCTION_ARGS)
1370 {
1371         char       *name;
1372         bool            flush;
1373         RepOriginId roident;
1374         XLogRecPtr      remote_lsn = InvalidXLogRecPtr;
1375
1376         replorigin_check_prerequisites(true, true);
1377
1378         name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1379         flush = PG_GETARG_BOOL(1);
1380
1381         roident = replorigin_by_name(name, false);
1382         Assert(OidIsValid(roident));
1383
1384         remote_lsn = replorigin_get_progress(roident, flush);
1385
1386         if (remote_lsn == InvalidXLogRecPtr)
1387                 PG_RETURN_NULL();
1388
1389         PG_RETURN_LSN(remote_lsn);
1390 }
1391
1392
1393 Datum
1394 pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1395 {
1396         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1397         TupleDesc       tupdesc;
1398         Tuplestorestate *tupstore;
1399         MemoryContext per_query_ctx;
1400         MemoryContext oldcontext;
1401         int                     i;
1402 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1403
1404         /* we we want to return 0 rows if slot is set to zero */
1405         replorigin_check_prerequisites(false, true);
1406
1407         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1408                 ereport(ERROR,
1409                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1410                                  errmsg("set-valued function called in context that cannot accept a set")));
1411         if (!(rsinfo->allowedModes & SFRM_Materialize))
1412                 ereport(ERROR,
1413                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1414                                  errmsg("materialize mode required, but it is not allowed in this context")));
1415         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1416                 elog(ERROR, "return type must be a row type");
1417
1418         if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1419                 elog(ERROR, "wrong function definition");
1420
1421         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1422         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1423
1424         tupstore = tuplestore_begin_heap(true, false, work_mem);
1425         rsinfo->returnMode = SFRM_Materialize;
1426         rsinfo->setResult = tupstore;
1427         rsinfo->setDesc = tupdesc;
1428
1429         MemoryContextSwitchTo(oldcontext);
1430
1431
1432         /* prevent slots from being concurrently dropped */
1433         LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1434
1435         /*
1436          * Iterate through all possible replication_states, display if they are
1437          * filled. Note that we do not take any locks, so slightly corrupted/out
1438          * of date values are a possibility.
1439          */
1440         for (i = 0; i < max_replication_slots; i++)
1441         {
1442                 ReplicationState *state;
1443                 Datum           values[REPLICATION_ORIGIN_PROGRESS_COLS];
1444                 bool            nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1445                 char       *roname;
1446
1447                 state = &replication_states[i];
1448
1449                 /* unused slot, nothing to display */
1450                 if (state->roident == InvalidRepOriginId)
1451                         continue;
1452
1453                 memset(values, 0, sizeof(values));
1454                 memset(nulls, 1, sizeof(nulls));
1455
1456                 values[0] = ObjectIdGetDatum(state->roident);
1457                 nulls[0] = false;
1458
1459                 /*
1460                  * We're not preventing the origin to be dropped concurrently, so
1461                  * silently accept that it might be gone.
1462                  */
1463                 if (replorigin_by_oid(state->roident, true,
1464                                                           &roname))
1465                 {
1466                         values[1] = CStringGetTextDatum(roname);
1467                         nulls[1] = false;
1468                 }
1469
1470                 LWLockAcquire(&state->lock, LW_SHARED);
1471
1472                 values[2] = LSNGetDatum(state->remote_lsn);
1473                 nulls[2] = false;
1474
1475                 values[3] = LSNGetDatum(state->local_lsn);
1476                 nulls[3] = false;
1477
1478                 LWLockRelease(&state->lock);
1479
1480                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1481         }
1482
1483         tuplestore_donestoring(tupstore);
1484
1485         LWLockRelease(ReplicationOriginLock);
1486
1487 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1488
1489         return (Datum) 0;
1490 }