1 /*-------------------------------------------------------------------------
4 * Support functions for replication slots
6 * Copyright (c) 2012-2018, PostgreSQL Global Development Group
9 * src/backend/replication/slotfuncs.c
11 *-------------------------------------------------------------------------
17 #include "miscadmin.h"
19 #include "access/htup_details.h"
20 #include "replication/decode.h"
21 #include "replication/slot.h"
22 #include "replication/logical.h"
23 #include "replication/logicalfuncs.h"
24 #include "utils/builtins.h"
25 #include "utils/inval.h"
26 #include "utils/pg_lsn.h"
27 #include "utils/resowner.h"
30 check_permissions(void)
32 if (!superuser() && !has_rolreplication(GetUserId()))
34 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
35 (errmsg("must be superuser or replication role to use replication slots"))));
39 * SQL function for creating a new physical (streaming replication)
43 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
45 Name name = PG_GETARG_NAME(0);
46 bool immediately_reserve = PG_GETARG_BOOL(1);
47 bool temporary = PG_GETARG_BOOL(2);
54 Assert(!MyReplicationSlot);
56 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
57 elog(ERROR, "return type must be a row type");
61 CheckSlotRequirements();
63 /* acquire replication slot, this will check for conflicting names */
64 ReplicationSlotCreate(NameStr(*name), false,
65 temporary ? RS_TEMPORARY : RS_PERSISTENT);
67 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
70 if (immediately_reserve)
72 /* Reserve WAL as the user asked for it */
73 ReplicationSlotReserveWal();
75 /* Write this slot to disk */
76 ReplicationSlotMarkDirty();
77 ReplicationSlotSave();
79 values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
87 tuple = heap_form_tuple(tupdesc, values, nulls);
88 result = HeapTupleGetDatum(tuple);
90 ReplicationSlotRelease();
92 PG_RETURN_DATUM(result);
97 * SQL function for creating a new logical replication slot.
100 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
102 Name name = PG_GETARG_NAME(0);
103 Name plugin = PG_GETARG_NAME(1);
104 bool temporary = PG_GETARG_BOOL(2);
106 LogicalDecodingContext *ctx = NULL;
114 Assert(!MyReplicationSlot);
116 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
117 elog(ERROR, "return type must be a row type");
121 CheckLogicalDecodingRequirements();
124 * Acquire a logical decoding slot, this will check for conflicting names.
125 * Initially create persistent slot as ephemeral - that allows us to
126 * nicely handle errors during initialization because it'll get dropped if
127 * this transaction fails. We'll make it persistent at the end. Temporary
128 * slots can be created as temporary from beginning as they get dropped on
131 ReplicationSlotCreate(NameStr(*name), true,
132 temporary ? RS_TEMPORARY : RS_EPHEMERAL);
135 * Create logical decoding context, to build the initial snapshot.
137 ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
138 false, /* do not build snapshot */
139 logical_read_local_xlog_page, NULL, NULL,
142 /* build initial snapshot, might take a while */
143 DecodingContextFindStartpoint(ctx);
145 values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
146 values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
148 /* don't need the decoding context anymore */
149 FreeDecodingContext(ctx);
151 memset(nulls, 0, sizeof(nulls));
153 tuple = heap_form_tuple(tupdesc, values, nulls);
154 result = HeapTupleGetDatum(tuple);
156 /* ok, slot is now fully created, mark it as persistent if needed */
158 ReplicationSlotPersist();
159 ReplicationSlotRelease();
161 PG_RETURN_DATUM(result);
166 * SQL function for dropping a replication slot.
169 pg_drop_replication_slot(PG_FUNCTION_ARGS)
171 Name name = PG_GETARG_NAME(0);
175 CheckSlotRequirements();
177 ReplicationSlotDrop(NameStr(*name), true);
183 * pg_get_replication_slots - SQL SRF showing active replication slots.
186 pg_get_replication_slots(PG_FUNCTION_ARGS)
188 #define PG_GET_REPLICATION_SLOTS_COLS 11
189 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
191 Tuplestorestate *tupstore;
192 MemoryContext per_query_ctx;
193 MemoryContext oldcontext;
196 /* check to see if caller supports us returning a tuplestore */
197 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
199 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200 errmsg("set-valued function called in context that cannot accept a set")));
201 if (!(rsinfo->allowedModes & SFRM_Materialize))
203 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
204 errmsg("materialize mode required, but it is not " \
205 "allowed in this context")));
207 /* Build a tuple descriptor for our result type */
208 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
209 elog(ERROR, "return type must be a row type");
212 * We don't require any special permission to see this function's data
213 * because nothing should be sensitive. The most critical being the slot
214 * name, which shouldn't contain anything particularly sensitive.
217 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
218 oldcontext = MemoryContextSwitchTo(per_query_ctx);
220 tupstore = tuplestore_begin_heap(true, false, work_mem);
221 rsinfo->returnMode = SFRM_Materialize;
222 rsinfo->setResult = tupstore;
223 rsinfo->setDesc = tupdesc;
225 MemoryContextSwitchTo(oldcontext);
227 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
228 for (slotno = 0; slotno < max_replication_slots; slotno++)
230 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
231 Datum values[PG_GET_REPLICATION_SLOTS_COLS];
232 bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
234 ReplicationSlotPersistency persistency;
236 TransactionId catalog_xmin;
237 XLogRecPtr restart_lsn;
238 XLogRecPtr confirmed_flush_lsn;
248 SpinLockAcquire(&slot->mutex);
250 xmin = slot->data.xmin;
251 catalog_xmin = slot->data.catalog_xmin;
252 database = slot->data.database;
253 restart_lsn = slot->data.restart_lsn;
254 confirmed_flush_lsn = slot->data.confirmed_flush;
255 namecpy(&slot_name, &slot->data.name);
256 namecpy(&plugin, &slot->data.plugin);
257 active_pid = slot->active_pid;
258 persistency = slot->data.persistency;
260 SpinLockRelease(&slot->mutex);
262 memset(nulls, 0, sizeof(nulls));
265 values[i++] = NameGetDatum(&slot_name);
267 if (database == InvalidOid)
270 values[i++] = NameGetDatum(&plugin);
272 if (database == InvalidOid)
273 values[i++] = CStringGetTextDatum("physical");
275 values[i++] = CStringGetTextDatum("logical");
277 if (database == InvalidOid)
280 values[i++] = database;
282 values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
283 values[i++] = BoolGetDatum(active_pid != 0);
286 values[i++] = Int32GetDatum(active_pid);
290 if (xmin != InvalidTransactionId)
291 values[i++] = TransactionIdGetDatum(xmin);
295 if (catalog_xmin != InvalidTransactionId)
296 values[i++] = TransactionIdGetDatum(catalog_xmin);
300 if (restart_lsn != InvalidXLogRecPtr)
301 values[i++] = LSNGetDatum(restart_lsn);
305 if (confirmed_flush_lsn != InvalidXLogRecPtr)
306 values[i++] = LSNGetDatum(confirmed_flush_lsn);
310 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
312 LWLockRelease(ReplicationSlotControlLock);
314 tuplestore_donestoring(tupstore);
320 * Helper function for advancing physical replication slot forward.
323 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
325 XLogRecPtr retlsn = InvalidXLogRecPtr;
327 SpinLockAcquire(&MyReplicationSlot->mutex);
328 if (MyReplicationSlot->data.restart_lsn < moveto)
330 MyReplicationSlot->data.restart_lsn = moveto;
333 SpinLockRelease(&MyReplicationSlot->mutex);
339 * Helper function for advancing logical replication slot forward.
342 pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
344 LogicalDecodingContext *ctx;
345 ResourceOwner old_resowner = CurrentResourceOwner;
346 XLogRecPtr retlsn = InvalidXLogRecPtr;
350 /* restart at slot's confirmed_flush */
351 ctx = CreateDecodingContext(InvalidXLogRecPtr,
354 logical_read_local_xlog_page,
357 CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
360 /* invalidate non-timetravel entries */
361 InvalidateSystemCaches();
363 /* Decode until we run out of records */
364 while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
365 (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
370 record = XLogReadRecord(ctx->reader, startlsn, &errm);
372 elog(ERROR, "%s", errm);
375 * Now that we've set up the xlog reader state, subsequent calls
376 * pass InvalidXLogRecPtr to say "continue from last record"
378 startlsn = InvalidXLogRecPtr;
381 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
382 * store the description into our tuplestore.
385 LogicalDecodingProcessRecord(ctx, ctx->reader);
388 if (moveto <= ctx->reader->EndRecPtr)
391 CHECK_FOR_INTERRUPTS();
394 CurrentResourceOwner = old_resowner;
396 if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
398 LogicalConfirmReceivedLocation(moveto);
401 * If only the confirmed_flush_lsn has changed the slot won't get
402 * marked as dirty by the above. Callers on the walsender
403 * interface are expected to keep track of their own progress and
404 * don't need it written out. But SQL-interface users cannot
405 * specify their own start positions and it's harder for them to
406 * keep track of their progress, so we should make more of an
407 * effort to save it for them.
409 * Dirty the slot so it's written out at the next checkpoint.
410 * We'll still lose its position on crash, as documented, but it's
411 * better than always losing the position even on clean restart.
413 ReplicationSlotMarkDirty();
416 retlsn = MyReplicationSlot->data.confirmed_flush;
418 /* free context, call shutdown callback */
419 FreeDecodingContext(ctx);
421 InvalidateSystemCaches();
425 /* clear all timetravel entries */
426 InvalidateSystemCaches();
436 * SQL function for moving the position in a replication slot.
439 pg_replication_slot_advance(PG_FUNCTION_ARGS)
441 Name slotname = PG_GETARG_NAME(0);
442 XLogRecPtr moveto = PG_GETARG_LSN(1);
451 Assert(!MyReplicationSlot);
455 if (XLogRecPtrIsInvalid(moveto))
457 (errmsg("invalid target wal lsn")));
459 /* Build a tuple descriptor for our result type */
460 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
461 elog(ERROR, "return type must be a row type");
464 * We can't move slot past what's been flushed/replayed so clamp the
465 * target position accordingly.
467 if (!RecoveryInProgress())
468 moveto = Min(moveto, GetFlushRecPtr());
470 moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
472 /* Acquire the slot so we "own" it */
473 ReplicationSlotAcquire(NameStr(*slotname), true);
475 startlsn = MyReplicationSlot->data.confirmed_flush;
476 if (moveto < startlsn)
478 ReplicationSlotRelease();
480 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481 errmsg("cannot move slot to %X/%X, minimum is %X/%X",
482 (uint32) (moveto >> 32), (uint32) moveto,
483 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
484 (uint32) (MyReplicationSlot->data.confirmed_flush))));
487 if (OidIsValid(MyReplicationSlot->data.database))
488 endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
490 endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
492 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
495 /* Update the on disk state when lsn was updated. */
496 if (XLogRecPtrIsInvalid(endlsn))
498 ReplicationSlotMarkDirty();
499 ReplicationSlotsComputeRequiredXmin(false);
500 ReplicationSlotsComputeRequiredLSN();
501 ReplicationSlotSave();
504 ReplicationSlotRelease();
506 /* Return the reached position. */
507 values[1] = LSNGetDatum(endlsn);
510 tuple = heap_form_tuple(tupdesc, values, nulls);
511 result = HeapTupleGetDatum(tuple);
513 PG_RETURN_DATUM(result);