<indexterm>
<primary>pg_create_physical_replication_slot</primary>
</indexterm>
- <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
+ <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
</entry>
<entry>
(<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
<parameter>slot_name</parameter>. Streaming changes from a physical slot
is only possible with the streaming-replication protocol - see <xref
linkend="protocol-replication">. Corresponds to the replication protocol
- command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+ command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
+ second parameter, when <literal>true</>, specifies that the <acronym>LSN</>
+ for this replication slot be reserved immediately; the <acronym<LSN</>
+ is otherwise reserved on first connection from a streaming replication
+ client.
</entry>
</row>
<row>
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
+CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
+ IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+ OUT slot_name name, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+AS 'pg_create_physical_replication_slot';
+
CREATE OR REPLACE FUNCTION
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
- /*
- * The replication slot mechanism is used to prevent removal of required
- * WAL. As there is no interlock between this and checkpoints required WAL
- * could be removed before ReplicationSlotsComputeRequiredLSN() has been
- * called to prevent that. In the very unlikely case that this happens
- * we'll just retry.
- */
- while (true)
- {
- XLogSegNo segno;
-
- /*
- * Let's start with enough information if we can, so log a standby
- * snapshot and start decoding at exactly that position.
- */
- if (!RecoveryInProgress())
- {
- XLogRecPtr flushptr;
-
- /* start at current insert position */
- slot->data.restart_lsn = GetXLogInsertRecPtr();
-
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
-
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
- }
- else
- slot->data.restart_lsn = GetRedoRecPtr();
-
- /* prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
-
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- XLByteToSeg(slot->data.restart_lsn, segno);
- if (XLogGetLastRemovedSegno() < segno)
- break;
- }
-
+ ReplicationSlotReserveWal();
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
#include <sys/stat.h>
#include "access/transam.h"
+#include "access/xlog_internal.h"
#include "common/string.h"
#include "miscadmin.h"
#include "replication/slot.h"
errmsg("replication slots can only be used if wal_level >= archive")));
}
+/*
+ * Reserve WAL for the currently active slot.
+ *
+ * Compute and set restart_lsn in a manner that's appropriate for the type of
+ * the slot and concurrency safe.
+ */
+void
+ReplicationSlotReserveWal(void)
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(slot != NULL);
+ Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+
+ /*
+ * The replication slot mechanism is used to prevent removal of required
+ * WAL. As there is no interlock between this routine and checkpoints, WAL
+ * segments could concurrently be removed when a now stale return value of
+ * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+ * this happens we'll just retry.
+ */
+ while (true)
+ {
+ XLogSegNo segno;
+
+ /*
+ * For logical slots log a standby snapshot and start logical decoding
+ * at exactly that position. That allows the slot to start up more
+ * quickly.
+ *
+ * That's not needed (or indeed helpful) for physical slots as they'll
+ * start replay at the last logged checkpoint anyway. Instead return
+ * the location of the last redo LSN. While that slightly increases
+ * the chance that we have to retry, it's where a base backup has to
+ * start replay at.
+ */
+ if (!RecoveryInProgress() && SlotIsLogical(slot))
+ {
+ XLogRecPtr flushptr;
+
+ /* start at current insert position */
+ slot->data.restart_lsn = GetXLogInsertRecPtr();
+
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
+
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
+ else
+ {
+ slot->data.restart_lsn = GetRedoRecPtr();
+ }
+
+ /* prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
+
+ /*
+ * If all required WAL is still there, great, otherwise retry. The
+ * slot should prevent further removal of WAL, unless there's a
+ * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+ * the new restart_lsn above, so normally we should never need to loop
+ * more than twice.
+ */
+ XLByteToSeg(slot->data.restart_lsn, segno);
+ if (XLogGetLastRemovedSegno() < segno)
+ break;
+ }
+}
+
/*
* Flush all replication slots to disk.
*
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
+ bool immediately_reserve = PG_GETARG_BOOL(1);
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-
nulls[0] = false;
- nulls[1] = true;
+
+ if (immediately_reserve)
+ {
+ /* Reserve WAL as the user asked for it */
+ ReplicationSlotReserveWal();
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+
+ values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
+ nulls[1] = false;
+ }
+ else
+ {
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ nulls[1] = true;
+ }
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201508101
+#define CATALOG_VERSION_NO 201508111
#endif
DESCR("SP-GiST support for quad tree over range");
/* replication slots */
-DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
/* misc stuff */
extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotReserveWal(void);
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);