subscription at the commit of the transaction where this command is run.
</para>
+ <para>
+ <command>CREATE SUBSCRIPTION</command> cannot be executed inside a
+ transaction block when <literal>CREATE SLOT</literal> is specified.
+ </para>
+
<para>
Additional info about subscriptions and logical replication as a whole
can is available at <xref linkend="logical-replication-subscription"> and
</para>
<para>
- The replication worker associated with the subscription will not stop until
- after the transaction that issued this command has committed.
+ <command>DROP SUBSCRIPTION</command> cannot be executed inside a
+ transaction block when <literal>DROP SLOT</literal> is specified.
</para>
</refsect1>
#include "access/heapam.h"
#include "access/htup_details.h"
+#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
* Create new subscription.
*/
ObjectAddress
-CreateSubscription(CreateSubscriptionStmt *stmt)
+CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
bool create_slot;
List *publications;
+ /*
+ * Parse and check options.
+ * Connection and publication should not be specified here.
+ */
+ parse_subscription_options(stmt->options, NULL, NULL,
+ &enabled_given, &enabled,
+ &create_slot, &slotname);
+
+ /*
+ * Since creating a replication slot is not transactional, rolling back
+ * the transaction leaves the created replication slot. So we cannot run
+ * CREATE SUBSCRIPTION inside a transaction block if creating a
+ * replication slot.
+ */
+ if (create_slot)
+ PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
+
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
stmt->subname)));
}
- /*
- * Parse and check options.
- * Connection and publication should not be specified here.
- */
- parse_subscription_options(stmt->options, NULL, NULL,
- &enabled_given, &enabled,
- &create_slot, &slotname);
if (slotname == NULL)
slotname = stmt->subname;
* Drop a subscription
*/
void
-DropSubscription(DropSubscriptionStmt *stmt)
+DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
+ /*
+ * Since dropping a replication slot is not transactional, the replication
+ * slot stays dropped even if the transaction rolls back. So we cannot
+ * run DROP SUBSCRIPTION inside a transaction block if dropping the
+ * replication slot.
+ */
+ if (stmt->drop_slot)
+ PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
+
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
break;
case T_CreateSubscriptionStmt:
- address = CreateSubscription((CreateSubscriptionStmt *) parsetree);
+ address = CreateSubscription((CreateSubscriptionStmt *) parsetree,
+ isTopLevel);
break;
case T_AlterSubscriptionStmt:
break;
case T_DropSubscriptionStmt:
- DropSubscription((DropSubscriptionStmt *) parsetree);
+ DropSubscription((DropSubscriptionStmt *) parsetree, isTopLevel);
/* no commands stashed for DROP */
commandCollected = true;
break;
#include "catalog/objectaddress.h"
#include "nodes/parsenodes.h"
-extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt);
+extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt,
+ bool isTopLevel);
extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
-extern void DropSubscription(DropSubscriptionStmt *stmt);
+extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo;
^
set client_min_messages to error;
+-- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
+BEGIN;
+CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
+ERROR: CREATE SUBSCRIPTION ... CREATE SLOT cannot run inside a transaction block
+COMMIT;
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string
testsub_foo | regress_subscription_user | f | {testpub,testpub1}
(1 row)
+-- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
+BEGIN;
+DROP SUBSCRIPTION testsub DROP SLOT;
+ERROR: DROP SUBSCRIPTION ... DROP SLOT cannot run inside a transaction block
+COMMIT;
+BEGIN;
DROP SUBSCRIPTION testsub_foo NODROP SLOT;
+COMMIT;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
CREATE SUBSCRIPTION testsub PUBLICATION foo;
set client_min_messages to error;
+-- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
+BEGIN;
+CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
+COMMIT;
+
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT);
reset client_min_messages;
\dRs
+-- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block
+BEGIN;
+DROP SUBSCRIPTION testsub DROP SLOT;
+COMMIT;
+
+BEGIN;
DROP SUBSCRIPTION testsub_foo NODROP SLOT;
+COMMIT;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;