]> granicus.if.org Git - postgresql/blob - src/backend/replication/slotfuncs.c
Fix typo in recent commit
[postgresql] / src / backend / replication / slotfuncs.c
1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  *         Support functions for replication slots
5  *
6  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *        src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13
14 #include "postgres.h"
15
16 #include "funcapi.h"
17 #include "miscadmin.h"
18
19 #include "access/htup_details.h"
20 #include "replication/decode.h"
21 #include "replication/slot.h"
22 #include "replication/logical.h"
23 #include "replication/logicalfuncs.h"
24 #include "utils/builtins.h"
25 #include "utils/inval.h"
26 #include "utils/pg_lsn.h"
27 #include "utils/resowner.h"
28
29 static void
30 check_permissions(void)
31 {
32         if (!superuser() && !has_rolreplication(GetUserId()))
33                 ereport(ERROR,
34                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
35                                  (errmsg("must be superuser or replication role to use replication slots"))));
36 }
37
38 /*
39  * SQL function for creating a new physical (streaming replication)
40  * replication slot.
41  */
42 Datum
43 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
44 {
45         Name            name = PG_GETARG_NAME(0);
46         bool            immediately_reserve = PG_GETARG_BOOL(1);
47         bool            temporary = PG_GETARG_BOOL(2);
48         Datum           values[2];
49         bool            nulls[2];
50         TupleDesc       tupdesc;
51         HeapTuple       tuple;
52         Datum           result;
53
54         Assert(!MyReplicationSlot);
55
56         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
57                 elog(ERROR, "return type must be a row type");
58
59         check_permissions();
60
61         CheckSlotRequirements();
62
63         /* acquire replication slot, this will check for conflicting names */
64         ReplicationSlotCreate(NameStr(*name), false,
65                                                   temporary ? RS_TEMPORARY : RS_PERSISTENT);
66
67         values[0] = NameGetDatum(&MyReplicationSlot->data.name);
68         nulls[0] = false;
69
70         if (immediately_reserve)
71         {
72                 /* Reserve WAL as the user asked for it */
73                 ReplicationSlotReserveWal();
74
75                 /* Write this slot to disk */
76                 ReplicationSlotMarkDirty();
77                 ReplicationSlotSave();
78
79                 values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
80                 nulls[1] = false;
81         }
82         else
83         {
84                 nulls[1] = true;
85         }
86
87         tuple = heap_form_tuple(tupdesc, values, nulls);
88         result = HeapTupleGetDatum(tuple);
89
90         ReplicationSlotRelease();
91
92         PG_RETURN_DATUM(result);
93 }
94
95
96 /*
97  * SQL function for creating a new logical replication slot.
98  */
99 Datum
100 pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
101 {
102         Name            name = PG_GETARG_NAME(0);
103         Name            plugin = PG_GETARG_NAME(1);
104         bool            temporary = PG_GETARG_BOOL(2);
105
106         LogicalDecodingContext *ctx = NULL;
107
108         TupleDesc       tupdesc;
109         HeapTuple       tuple;
110         Datum           result;
111         Datum           values[2];
112         bool            nulls[2];
113
114         Assert(!MyReplicationSlot);
115
116         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
117                 elog(ERROR, "return type must be a row type");
118
119         check_permissions();
120
121         CheckLogicalDecodingRequirements();
122
123         /*
124          * Acquire a logical decoding slot, this will check for conflicting names.
125          * Initially create persistent slot as ephemeral - that allows us to
126          * nicely handle errors during initialization because it'll get dropped if
127          * this transaction fails. We'll make it persistent at the end. Temporary
128          * slots can be created as temporary from beginning as they get dropped on
129          * error as well.
130          */
131         ReplicationSlotCreate(NameStr(*name), true,
132                                                   temporary ? RS_TEMPORARY : RS_EPHEMERAL);
133
134         /*
135          * Create logical decoding context, to build the initial snapshot.
136          */
137         ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
138                                                                         false,  /* do not build snapshot */
139                                                                         logical_read_local_xlog_page, NULL, NULL,
140                                                                         NULL);
141
142         /* build initial snapshot, might take a while */
143         DecodingContextFindStartpoint(ctx);
144
145         values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
146         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
147
148         /* don't need the decoding context anymore */
149         FreeDecodingContext(ctx);
150
151         memset(nulls, 0, sizeof(nulls));
152
153         tuple = heap_form_tuple(tupdesc, values, nulls);
154         result = HeapTupleGetDatum(tuple);
155
156         /* ok, slot is now fully created, mark it as persistent if needed */
157         if (!temporary)
158                 ReplicationSlotPersist();
159         ReplicationSlotRelease();
160
161         PG_RETURN_DATUM(result);
162 }
163
164
165 /*
166  * SQL function for dropping a replication slot.
167  */
168 Datum
169 pg_drop_replication_slot(PG_FUNCTION_ARGS)
170 {
171         Name            name = PG_GETARG_NAME(0);
172
173         check_permissions();
174
175         CheckSlotRequirements();
176
177         ReplicationSlotDrop(NameStr(*name), true);
178
179         PG_RETURN_VOID();
180 }
181
182 /*
183  * pg_get_replication_slots - SQL SRF showing active replication slots.
184  */
185 Datum
186 pg_get_replication_slots(PG_FUNCTION_ARGS)
187 {
188 #define PG_GET_REPLICATION_SLOTS_COLS 11
189         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
190         TupleDesc       tupdesc;
191         Tuplestorestate *tupstore;
192         MemoryContext per_query_ctx;
193         MemoryContext oldcontext;
194         int                     slotno;
195
196         /* check to see if caller supports us returning a tuplestore */
197         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
198                 ereport(ERROR,
199                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200                                  errmsg("set-valued function called in context that cannot accept a set")));
201         if (!(rsinfo->allowedModes & SFRM_Materialize))
202                 ereport(ERROR,
203                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
204                                  errmsg("materialize mode required, but it is not " \
205                                                 "allowed in this context")));
206
207         /* Build a tuple descriptor for our result type */
208         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
209                 elog(ERROR, "return type must be a row type");
210
211         /*
212          * We don't require any special permission to see this function's data
213          * because nothing should be sensitive. The most critical being the slot
214          * name, which shouldn't contain anything particularly sensitive.
215          */
216
217         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
218         oldcontext = MemoryContextSwitchTo(per_query_ctx);
219
220         tupstore = tuplestore_begin_heap(true, false, work_mem);
221         rsinfo->returnMode = SFRM_Materialize;
222         rsinfo->setResult = tupstore;
223         rsinfo->setDesc = tupdesc;
224
225         MemoryContextSwitchTo(oldcontext);
226
227         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
228         for (slotno = 0; slotno < max_replication_slots; slotno++)
229         {
230                 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
231                 Datum           values[PG_GET_REPLICATION_SLOTS_COLS];
232                 bool            nulls[PG_GET_REPLICATION_SLOTS_COLS];
233
234                 ReplicationSlotPersistency persistency;
235                 TransactionId xmin;
236                 TransactionId catalog_xmin;
237                 XLogRecPtr      restart_lsn;
238                 XLogRecPtr      confirmed_flush_lsn;
239                 pid_t           active_pid;
240                 Oid                     database;
241                 NameData        slot_name;
242                 NameData        plugin;
243                 int                     i;
244
245                 if (!slot->in_use)
246                         continue;
247
248                 SpinLockAcquire(&slot->mutex);
249
250                 xmin = slot->data.xmin;
251                 catalog_xmin = slot->data.catalog_xmin;
252                 database = slot->data.database;
253                 restart_lsn = slot->data.restart_lsn;
254                 confirmed_flush_lsn = slot->data.confirmed_flush;
255                 namecpy(&slot_name, &slot->data.name);
256                 namecpy(&plugin, &slot->data.plugin);
257                 active_pid = slot->active_pid;
258                 persistency = slot->data.persistency;
259
260                 SpinLockRelease(&slot->mutex);
261
262                 memset(nulls, 0, sizeof(nulls));
263
264                 i = 0;
265                 values[i++] = NameGetDatum(&slot_name);
266
267                 if (database == InvalidOid)
268                         nulls[i++] = true;
269                 else
270                         values[i++] = NameGetDatum(&plugin);
271
272                 if (database == InvalidOid)
273                         values[i++] = CStringGetTextDatum("physical");
274                 else
275                         values[i++] = CStringGetTextDatum("logical");
276
277                 if (database == InvalidOid)
278                         nulls[i++] = true;
279                 else
280                         values[i++] = database;
281
282                 values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
283                 values[i++] = BoolGetDatum(active_pid != 0);
284
285                 if (active_pid != 0)
286                         values[i++] = Int32GetDatum(active_pid);
287                 else
288                         nulls[i++] = true;
289
290                 if (xmin != InvalidTransactionId)
291                         values[i++] = TransactionIdGetDatum(xmin);
292                 else
293                         nulls[i++] = true;
294
295                 if (catalog_xmin != InvalidTransactionId)
296                         values[i++] = TransactionIdGetDatum(catalog_xmin);
297                 else
298                         nulls[i++] = true;
299
300                 if (restart_lsn != InvalidXLogRecPtr)
301                         values[i++] = LSNGetDatum(restart_lsn);
302                 else
303                         nulls[i++] = true;
304
305                 if (confirmed_flush_lsn != InvalidXLogRecPtr)
306                         values[i++] = LSNGetDatum(confirmed_flush_lsn);
307                 else
308                         nulls[i++] = true;
309
310                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
311         }
312         LWLockRelease(ReplicationSlotControlLock);
313
314         tuplestore_donestoring(tupstore);
315
316         return (Datum) 0;
317 }
318
319 /*
320  * Helper function for advancing physical replication slot forward.
321  */
322 static XLogRecPtr
323 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
324 {
325         XLogRecPtr      retlsn = InvalidXLogRecPtr;
326
327         SpinLockAcquire(&MyReplicationSlot->mutex);
328         if (MyReplicationSlot->data.restart_lsn < moveto)
329         {
330                 MyReplicationSlot->data.restart_lsn = moveto;
331                 retlsn = moveto;
332         }
333         SpinLockRelease(&MyReplicationSlot->mutex);
334
335         return retlsn;
336 }
337
338 /*
339  * Helper function for advancing logical replication slot forward.
340  */
341 static XLogRecPtr
342 pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
343 {
344         LogicalDecodingContext *ctx;
345         ResourceOwner   old_resowner = CurrentResourceOwner;
346         XLogRecPtr              retlsn = InvalidXLogRecPtr;
347
348         PG_TRY();
349         {
350                 /* restart at slot's confirmed_flush */
351                 ctx = CreateDecodingContext(InvalidXLogRecPtr,
352                                                                         NIL,
353                                                                         true,
354                                                                         logical_read_local_xlog_page,
355                                                                         NULL, NULL, NULL);
356
357                 CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
358                                                                                                    "logical decoding");
359
360                 /* invalidate non-timetravel entries */
361                 InvalidateSystemCaches();
362
363                 /* Decode until we run out of records */
364                 while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
365                            (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
366                 {
367                         XLogRecord *record;
368                         char       *errm = NULL;
369
370                         record = XLogReadRecord(ctx->reader, startlsn, &errm);
371                         if (errm)
372                                 elog(ERROR, "%s", errm);
373
374                         /*
375                          * Now that we've set up the xlog reader state, subsequent calls
376                          * pass InvalidXLogRecPtr to say "continue from last record"
377                          */
378                         startlsn = InvalidXLogRecPtr;
379
380                         /*
381                          * The {begin_txn,change,commit_txn}_wrapper callbacks above will
382                          * store the description into our tuplestore.
383                          */
384                         if (record != NULL)
385                                 LogicalDecodingProcessRecord(ctx, ctx->reader);
386
387                         /* check limits */
388                         if (moveto <= ctx->reader->EndRecPtr)
389                                 break;
390
391                         CHECK_FOR_INTERRUPTS();
392                 }
393
394                 CurrentResourceOwner = old_resowner;
395
396                 if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
397                 {
398                         LogicalConfirmReceivedLocation(moveto);
399
400                         /*
401                          * If only the confirmed_flush_lsn has changed the slot won't get
402                          * marked as dirty by the above. Callers on the walsender
403                          * interface are expected to keep track of their own progress and
404                          * don't need it written out. But SQL-interface users cannot
405                          * specify their own start positions and it's harder for them to
406                          * keep track of their progress, so we should make more of an
407                          * effort to save it for them.
408                          *
409                          * Dirty the slot so it's written out at the next checkpoint.
410                          * We'll still lose its position on crash, as documented, but it's
411                          * better than always losing the position even on clean restart.
412                          */
413                         ReplicationSlotMarkDirty();
414                 }
415
416                 retlsn = MyReplicationSlot->data.confirmed_flush;
417
418                 /* free context, call shutdown callback */
419                 FreeDecodingContext(ctx);
420
421                 InvalidateSystemCaches();
422         }
423         PG_CATCH();
424         {
425                 /* clear all timetravel entries */
426                 InvalidateSystemCaches();
427
428                 PG_RE_THROW();
429         }
430         PG_END_TRY();
431
432         return retlsn;
433 }
434
435 /*
436  * SQL function for moving the position in a replication slot.
437  */
438 Datum
439 pg_replication_slot_advance(PG_FUNCTION_ARGS)
440 {
441         Name            slotname = PG_GETARG_NAME(0);
442         XLogRecPtr      moveto = PG_GETARG_LSN(1);
443         XLogRecPtr      endlsn;
444         XLogRecPtr      startlsn;
445         TupleDesc       tupdesc;
446         Datum           values[2];
447         bool            nulls[2];
448         HeapTuple       tuple;
449         Datum           result;
450
451         Assert(!MyReplicationSlot);
452
453         check_permissions();
454
455         if (XLogRecPtrIsInvalid(moveto))
456                 ereport(ERROR,
457                                 (errmsg("invalid target wal lsn")));
458
459         /* Build a tuple descriptor for our result type */
460         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
461                 elog(ERROR, "return type must be a row type");
462
463         /*
464          * We can't move slot past what's been flushed/replayed so clamp the
465          * target position accordingly.
466          */
467         if (!RecoveryInProgress())
468                 moveto = Min(moveto, GetFlushRecPtr());
469         else
470                 moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
471
472         /* Acquire the slot so we "own" it */
473         ReplicationSlotAcquire(NameStr(*slotname), true);
474
475         startlsn = MyReplicationSlot->data.confirmed_flush;
476         if (moveto < startlsn)
477         {
478                 ReplicationSlotRelease();
479                 ereport(ERROR,
480                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481                                  errmsg("cannot move slot to %X/%X, minimum is %X/%X",
482                                                 (uint32) (moveto >> 32), (uint32) moveto,
483                                                 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
484                                                 (uint32) (MyReplicationSlot->data.confirmed_flush))));
485         }
486
487         if (OidIsValid(MyReplicationSlot->data.database))
488                 endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
489         else
490                 endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
491
492         values[0] = NameGetDatum(&MyReplicationSlot->data.name);
493         nulls[0] = false;
494
495         /* Update the on disk state when lsn was updated. */
496         if (XLogRecPtrIsInvalid(endlsn))
497         {
498                 ReplicationSlotMarkDirty();
499                 ReplicationSlotsComputeRequiredXmin(false);
500                 ReplicationSlotsComputeRequiredLSN();
501                 ReplicationSlotSave();
502         }
503
504         ReplicationSlotRelease();
505
506         /* Return the reached position. */
507         values[1] = LSNGetDatum(endlsn);
508         nulls[1] = false;
509
510         tuple = heap_form_tuple(tupdesc, values, nulls);
511         result = HeapTupleGetDatum(tuple);
512
513         PG_RETURN_DATUM(result);
514 }