1 /*-------------------------------------------------------------------------
4 * subscription catalog manipulation functions
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
12 *-------------------------------------------------------------------------
17 #include "miscadmin.h"
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/subscriptioncmds.h"
36 #include "executor/executor.h"
38 #include "nodes/makefuncs.h"
40 #include "replication/logicallauncher.h"
41 #include "replication/origin.h"
42 #include "replication/walreceiver.h"
43 #include "replication/walsender.h"
44 #include "replication/worker_internal.h"
46 #include "storage/lmgr.h"
48 #include "utils/builtins.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
54 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
57 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
59 * Since not all options can be specified in both commands, this function
60 * will report an error on options if the target output pointer is NULL to
64 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
65 bool *enabled, bool *create_slot,
66 bool *slot_name_given, char **slot_name,
67 bool *copy_data, char **synchronous_commit,
71 bool connect_given = false;
72 bool create_slot_given = false;
73 bool copy_data_given = false;
74 bool refresh_given = false;
76 /* If connect is specified, the others also need to be. */
77 Assert(!connect || (enabled && create_slot && copy_data));
83 *enabled_given = false;
90 *slot_name_given = false;
95 if (synchronous_commit)
96 *synchronous_commit = NULL;
103 DefElem *defel = (DefElem *) lfirst(lc);
105 if (strcmp(defel->defname, "connect") == 0 && connect)
109 (errcode(ERRCODE_SYNTAX_ERROR),
110 errmsg("conflicting or redundant options")));
112 connect_given = true;
113 *connect = defGetBoolean(defel);
115 else if (strcmp(defel->defname, "enabled") == 0 && enabled)
119 (errcode(ERRCODE_SYNTAX_ERROR),
120 errmsg("conflicting or redundant options")));
122 *enabled_given = true;
123 *enabled = defGetBoolean(defel);
125 else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
127 if (create_slot_given)
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("conflicting or redundant options")));
132 create_slot_given = true;
133 *create_slot = defGetBoolean(defel);
135 else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
137 if (*slot_name_given)
139 (errcode(ERRCODE_SYNTAX_ERROR),
140 errmsg("conflicting or redundant options")));
142 *slot_name_given = true;
143 *slot_name = defGetString(defel);
145 /* Setting slot_name = NONE is treated as no slot name. */
146 if (strcmp(*slot_name, "none") == 0)
149 else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
153 (errcode(ERRCODE_SYNTAX_ERROR),
154 errmsg("conflicting or redundant options")));
156 copy_data_given = true;
157 *copy_data = defGetBoolean(defel);
159 else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
162 if (*synchronous_commit)
164 (errcode(ERRCODE_SYNTAX_ERROR),
165 errmsg("conflicting or redundant options")));
167 *synchronous_commit = defGetString(defel);
169 /* Test if the given value is valid for synchronous_commit GUC. */
170 (void) set_config_option("synchronous_commit", *synchronous_commit,
171 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
174 else if (strcmp(defel->defname, "refresh") == 0 && refresh)
178 (errcode(ERRCODE_SYNTAX_ERROR),
179 errmsg("conflicting or redundant options")));
181 refresh_given = true;
182 *refresh = defGetBoolean(defel);
186 (errcode(ERRCODE_SYNTAX_ERROR),
187 errmsg("unrecognized subscription parameter: %s", defel->defname)));
191 * We've been explicitly asked to not connect, that requires some
192 * additional processing.
194 if (connect && !*connect)
196 /* Check for incompatible options from the user. */
197 if (enabled && *enabled_given && *enabled)
199 (errcode(ERRCODE_SYNTAX_ERROR),
200 errmsg("connect = false and enabled = true are mutually exclusive options")));
202 if (create_slot && create_slot_given && *create_slot)
204 (errcode(ERRCODE_SYNTAX_ERROR),
205 errmsg("connect = false and create_slot = true are mutually exclusive options")));
207 if (copy_data && copy_data_given && *copy_data)
209 (errcode(ERRCODE_SYNTAX_ERROR),
210 errmsg("connect = false and copy_data = true are mutually exclusive options")));
212 /* Change the defaults of other options. */
214 *create_slot = false;
219 * Do additional checking for disallowed combination when slot_name = NONE
222 if (slot_name && *slot_name_given && !*slot_name)
224 if (enabled && *enabled_given && *enabled)
226 (errcode(ERRCODE_SYNTAX_ERROR),
227 errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
229 if (create_slot && create_slot_given && *create_slot)
231 (errcode(ERRCODE_SYNTAX_ERROR),
232 errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
234 if (enabled && !*enabled_given && *enabled)
236 (errcode(ERRCODE_SYNTAX_ERROR),
237 errmsg("subscription with slot_name = NONE must also set enabled = false")));
239 if (create_slot && !create_slot_given && *create_slot)
241 (errcode(ERRCODE_SYNTAX_ERROR),
242 errmsg("subscription with slot_name = NONE must also set create_slot = false")));
247 * Auxiliary function to build a text array out of a list of String nodes.
250 publicationListToArray(List *publist)
256 MemoryContext memcxt;
257 MemoryContext oldcxt;
259 /* Create memory context for temporary allocations. */
260 memcxt = AllocSetContextCreate(CurrentMemoryContext,
261 "publicationListToArray to array",
262 ALLOCSET_DEFAULT_SIZES);
263 oldcxt = MemoryContextSwitchTo(memcxt);
265 datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
267 foreach(cell, publist)
269 char *name = strVal(lfirst(cell));
272 /* Check for duplicates. */
273 foreach(pcell, publist)
275 char *pname = strVal(lfirst(pcell));
280 if (strcmp(name, pname) == 0)
282 (errcode(ERRCODE_SYNTAX_ERROR),
283 errmsg("publication name \"%s\" used more than once",
287 datums[j++] = CStringGetTextDatum(name);
290 MemoryContextSwitchTo(oldcxt);
292 arr = construct_array(datums, list_length(publist),
293 TEXTOID, -1, false, 'i');
295 MemoryContextDelete(memcxt);
297 return PointerGetDatum(arr);
301 * Create new subscription.
304 CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
307 ObjectAddress myself;
309 bool nulls[Natts_pg_subscription];
310 Datum values[Natts_pg_subscription];
311 Oid owner = GetUserId();
317 char *synchronous_commit;
321 char originname[NAMEDATALEN];
326 * Parse and check options.
328 * Connection and publication should not be specified here.
330 parse_subscription_options(stmt->options, &connect, &enabled_given,
331 &enabled, &create_slot, &slotname_given,
332 &slotname, ©_data, &synchronous_commit,
336 * Since creating a replication slot is not transactional, rolling back
337 * the transaction leaves the created replication slot. So we cannot run
338 * CREATE SUBSCRIPTION inside a transaction block if creating a
342 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
346 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347 (errmsg("must be superuser to create subscriptions"))));
349 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
351 /* Check if name is used */
352 subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
353 CStringGetDatum(stmt->subname));
354 if (OidIsValid(subid))
357 (errcode(ERRCODE_DUPLICATE_OBJECT),
358 errmsg("subscription \"%s\" already exists",
362 if (!slotname_given && slotname == NULL)
363 slotname = stmt->subname;
365 /* The default for synchronous_commit of subscriptions is off. */
366 if (synchronous_commit == NULL)
367 synchronous_commit = "off";
369 conninfo = stmt->conninfo;
370 publications = stmt->publication;
372 /* Load the library providing us libpq calls. */
373 load_file("libpqwalreceiver", false);
375 /* Check the connection info string. */
376 walrcv_check_conninfo(conninfo);
378 /* Everything ok, form a new tuple. */
379 memset(values, 0, sizeof(values));
380 memset(nulls, false, sizeof(nulls));
382 values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
383 values[Anum_pg_subscription_subname - 1] =
384 DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
385 values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
386 values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
387 values[Anum_pg_subscription_subconninfo - 1] =
388 CStringGetTextDatum(conninfo);
390 values[Anum_pg_subscription_subslotname - 1] =
391 DirectFunctionCall1(namein, CStringGetDatum(slotname));
393 nulls[Anum_pg_subscription_subslotname - 1] = true;
394 values[Anum_pg_subscription_subsynccommit - 1] =
395 CStringGetTextDatum(synchronous_commit);
396 values[Anum_pg_subscription_subpublications - 1] =
397 publicationListToArray(publications);
399 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
401 /* Insert tuple into catalog. */
402 subid = CatalogTupleInsert(rel, tup);
405 recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
407 snprintf(originname, sizeof(originname), "pg_%u", subid);
408 replorigin_create(originname);
411 * Connect to remote side to execute requested commands and fetch table
418 WalReceiverConn *wrconn;
423 /* Try to connect to the publisher. */
424 wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
427 (errmsg("could not connect to the publisher: %s", err)));
432 * Set sync state based on if we were asked to do data copy or
435 table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
438 * Get the table list from publisher and build local table status
441 tables = fetch_table_list(wrconn, publications);
444 RangeVar *rv = (RangeVar *) lfirst(lc);
447 relid = RangeVarGetRelid(rv, AccessShareLock, false);
449 /* Check for supported relkind. */
450 CheckSubscriptionRelkind(get_rel_relkind(relid),
451 rv->schemaname, rv->relname);
453 SetSubscriptionRelState(subid, relid, table_state,
454 InvalidXLogRecPtr, false);
458 * If requested, create permanent slot for the subscription. We
459 * won't use the initial snapshot for anything, so no need to
466 walrcv_create_slot(wrconn, slotname, false,
467 CRS_NOEXPORT_SNAPSHOT, &lsn);
469 (errmsg("created replication slot \"%s\" on publisher",
475 /* Close the connection in case of failure. */
476 walrcv_disconnect(wrconn);
481 /* And we are done with the remote side. */
482 walrcv_disconnect(wrconn);
486 (errmsg("tables were not subscribed, you will have to run "
487 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
488 "subscribe the tables")));
490 heap_close(rel, RowExclusiveLock);
493 ApplyLauncherWakeupAtCommit();
495 ObjectAddressSet(myself, SubscriptionRelationId, subid);
497 InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
503 AlterSubscription_refresh(Subscription *sub, bool copy_data)
508 Oid *subrel_local_oids;
509 Oid *pubrel_local_oids;
513 /* Load the library providing us libpq calls. */
514 load_file("libpqwalreceiver", false);
516 /* Try to connect to the publisher. */
517 wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
520 (errmsg("could not connect to the publisher: %s", err)));
522 /* Get the table list from publisher. */
523 pubrel_names = fetch_table_list(wrconn, sub->publications);
525 /* We are done with the remote side, close connection. */
526 walrcv_disconnect(wrconn);
528 /* Get local table list. */
529 subrel_states = GetSubscriptionRelations(sub->oid);
532 * Build qsorted array of local table oids for faster lookup. This can
533 * potentially contain all tables in the database so speed of lookup is
536 subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
538 foreach(lc, subrel_states)
540 SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
542 subrel_local_oids[off++] = relstate->relid;
544 qsort(subrel_local_oids, list_length(subrel_states),
545 sizeof(Oid), oid_cmp);
548 * Walk over the remote tables and try to match them to locally known
549 * tables. If the table is not known locally create a new state for it.
551 * Also builds array of local oids of remote tables for the next step.
554 pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
556 foreach(lc, pubrel_names)
558 RangeVar *rv = (RangeVar *) lfirst(lc);
561 relid = RangeVarGetRelid(rv, AccessShareLock, false);
563 /* Check for supported relkind. */
564 CheckSubscriptionRelkind(get_rel_relkind(relid),
565 rv->schemaname, rv->relname);
567 pubrel_local_oids[off++] = relid;
569 if (!bsearch(&relid, subrel_local_oids,
570 list_length(subrel_states), sizeof(Oid), oid_cmp))
572 SetSubscriptionRelState(sub->oid, relid,
573 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
574 InvalidXLogRecPtr, false);
576 (errmsg("table \"%s.%s\" added to subscription \"%s\"",
577 rv->schemaname, rv->relname, sub->name)));
582 * Next remove state for tables we should not care about anymore using the
583 * data we collected above
585 qsort(pubrel_local_oids, list_length(pubrel_names),
586 sizeof(Oid), oid_cmp);
588 for (off = 0; off < list_length(subrel_states); off++)
590 Oid relid = subrel_local_oids[off];
592 if (!bsearch(&relid, pubrel_local_oids,
593 list_length(pubrel_names), sizeof(Oid), oid_cmp))
595 RemoveSubscriptionRel(sub->oid, relid);
597 logicalrep_worker_stop_at_commit(sub->oid, relid);
600 (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
601 get_namespace_name(get_rel_namespace(relid)),
609 * Alter the existing subscription.
612 AlterSubscription(AlterSubscriptionStmt *stmt)
615 ObjectAddress myself;
616 bool nulls[Natts_pg_subscription];
617 bool replaces[Natts_pg_subscription];
618 Datum values[Natts_pg_subscription];
621 bool update_tuple = false;
624 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
626 /* Fetch the existing tuple. */
627 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
628 CStringGetDatum(stmt->subname));
630 if (!HeapTupleIsValid(tup))
632 (errcode(ERRCODE_UNDEFINED_OBJECT),
633 errmsg("subscription \"%s\" does not exist",
637 if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
638 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
641 subid = HeapTupleGetOid(tup);
642 sub = GetSubscription(subid, false);
644 /* Lock the subscription so nobody else can do anything with it. */
645 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
647 /* Form a new tuple. */
648 memset(values, 0, sizeof(values));
649 memset(nulls, false, sizeof(nulls));
650 memset(replaces, false, sizeof(replaces));
654 case ALTER_SUBSCRIPTION_OPTIONS:
658 char *synchronous_commit;
660 parse_subscription_options(stmt->options, NULL, NULL, NULL,
661 NULL, &slotname_given, &slotname,
662 NULL, &synchronous_commit, NULL);
666 if (sub->enabled && !slotname)
668 (errcode(ERRCODE_SYNTAX_ERROR),
669 errmsg("cannot set slot_name = NONE for enabled subscription")));
672 values[Anum_pg_subscription_subslotname - 1] =
673 DirectFunctionCall1(namein, CStringGetDatum(slotname));
675 nulls[Anum_pg_subscription_subslotname - 1] = true;
676 replaces[Anum_pg_subscription_subslotname - 1] = true;
679 if (synchronous_commit)
681 values[Anum_pg_subscription_subsynccommit - 1] =
682 CStringGetTextDatum(synchronous_commit);
683 replaces[Anum_pg_subscription_subsynccommit - 1] = true;
690 case ALTER_SUBSCRIPTION_ENABLED:
695 parse_subscription_options(stmt->options, NULL,
696 &enabled_given, &enabled, NULL,
697 NULL, NULL, NULL, NULL, NULL);
698 Assert(enabled_given);
700 if (!sub->slotname && enabled)
702 (errcode(ERRCODE_SYNTAX_ERROR),
703 errmsg("cannot enable subscription that does not have a slot name")));
705 values[Anum_pg_subscription_subenabled - 1] =
706 BoolGetDatum(enabled);
707 replaces[Anum_pg_subscription_subenabled - 1] = true;
710 ApplyLauncherWakeupAtCommit();
716 case ALTER_SUBSCRIPTION_CONNECTION:
717 /* Load the library providing us libpq calls. */
718 load_file("libpqwalreceiver", false);
719 /* Check the connection info string. */
720 walrcv_check_conninfo(stmt->conninfo);
722 values[Anum_pg_subscription_subconninfo - 1] =
723 CStringGetTextDatum(stmt->conninfo);
724 replaces[Anum_pg_subscription_subconninfo - 1] = true;
728 case ALTER_SUBSCRIPTION_PUBLICATION:
733 parse_subscription_options(stmt->options, NULL, NULL, NULL,
734 NULL, NULL, NULL, ©_data,
737 values[Anum_pg_subscription_subpublications - 1] =
738 publicationListToArray(stmt->publication);
739 replaces[Anum_pg_subscription_subpublications - 1] = true;
743 /* Refresh if user asked us to. */
748 (errcode(ERRCODE_SYNTAX_ERROR),
749 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
750 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
752 /* Make sure refresh sees the new list of publications. */
753 sub->publications = stmt->publication;
755 AlterSubscription_refresh(sub, copy_data);
761 case ALTER_SUBSCRIPTION_REFRESH:
767 (errcode(ERRCODE_SYNTAX_ERROR),
768 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
770 parse_subscription_options(stmt->options, NULL, NULL, NULL,
771 NULL, NULL, NULL, ©_data,
774 AlterSubscription_refresh(sub, copy_data);
780 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
784 /* Update the catalog if needed. */
787 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
790 CatalogTupleUpdate(rel, &tup->t_self, tup);
795 heap_close(rel, RowExclusiveLock);
797 ObjectAddressSet(myself, SubscriptionRelationId, subid);
799 InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
805 * Drop a subscription
808 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
811 ObjectAddress myself;
821 char originname[NAMEDATALEN];
823 RepOriginId originid;
824 WalReceiverConn *wrconn = NULL;
828 * Lock pg_subscription with AccessExclusiveLock to ensure that the
829 * launcher doesn't restart new worker during dropping the subscription
831 rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
833 tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
834 CStringGetDatum(stmt->subname));
836 if (!HeapTupleIsValid(tup))
838 heap_close(rel, NoLock);
840 if (!stmt->missing_ok)
842 (errcode(ERRCODE_UNDEFINED_OBJECT),
843 errmsg("subscription \"%s\" does not exist",
847 (errmsg("subscription \"%s\" does not exist, skipping",
853 subid = HeapTupleGetOid(tup);
856 if (!pg_subscription_ownercheck(subid, GetUserId()))
857 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
860 /* DROP hook for the subscription being removed */
861 InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
864 * Lock the subscription so nobody else can do anything with it (including
865 * the replication workers).
867 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
870 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
871 Anum_pg_subscription_subname, &isnull);
873 subname = pstrdup(NameStr(*DatumGetName(datum)));
876 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
877 Anum_pg_subscription_subconninfo, &isnull);
879 conninfo = TextDatumGetCString(datum);
882 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
883 Anum_pg_subscription_subslotname, &isnull);
885 slotname = pstrdup(NameStr(*DatumGetName(datum)));
890 * Since dropping a replication slot is not transactional, the replication
891 * slot stays dropped even if the transaction rolls back. So we cannot
892 * run DROP SUBSCRIPTION inside a transaction block if dropping the
895 * XXX The command name should really be something like "DROP SUBSCRIPTION
896 * of a subscription that is associated with a replication slot", but we
897 * don't have the proper facilities for that.
900 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
903 ObjectAddressSet(myself, SubscriptionRelationId, subid);
904 EventTriggerSQLDropAddObject(&myself, true, true);
906 /* Remove the tuple from catalog. */
907 CatalogTupleDelete(rel, &tup->t_self);
909 ReleaseSysCache(tup);
912 * Stop all the subscription workers immediately.
914 * This is necessary if we are dropping the replication slot, so that the
915 * slot becomes accessible.
917 * It is also necessary if the subscription is disabled and was disabled
918 * in the same transaction. Then the workers haven't seen the disabling
919 * yet and will still be running, leading to hangs later when we want to
920 * drop the replication origin. If the subscription was disabled before
921 * this transaction, then there shouldn't be any workers left, so this
922 * won't make a difference.
924 * New workers won't be started because we hold an exclusive lock on the
925 * subscription till the end of the transaction.
927 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
928 subworkers = logicalrep_workers_find(subid, false);
929 LWLockRelease(LogicalRepWorkerLock);
930 foreach(lc, subworkers)
932 LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
934 logicalrep_worker_stop(w->subid, w->relid);
936 list_free(subworkers);
938 /* Clean up dependencies */
939 deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
941 /* Remove any associated relation synchronization states. */
942 RemoveSubscriptionRel(subid, InvalidOid);
944 /* Remove the origin tracking if exists. */
945 snprintf(originname, sizeof(originname), "pg_%u", subid);
946 originid = replorigin_by_name(originname, true);
947 if (originid != InvalidRepOriginId)
948 replorigin_drop(originid, false);
951 * If there is no slot associated with the subscription, we can finish
956 heap_close(rel, NoLock);
961 * Otherwise drop the replication slot at the publisher node using the
962 * replication connection.
964 load_file("libpqwalreceiver", false);
966 initStringInfo(&cmd);
967 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
969 wrconn = walrcv_connect(conninfo, true, subname, &err);
972 (errmsg("could not connect to publisher when attempting to "
973 "drop the replication slot \"%s\"", slotname),
974 errdetail("The error was: %s", err),
975 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
976 "to disassociate the subscription from the slot.")));
980 WalRcvExecResult *res;
982 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
984 if (res->status != WALRCV_OK_COMMAND)
986 (errmsg("could not drop the replication slot \"%s\" on publisher",
988 errdetail("The error was: %s", res->err)));
991 (errmsg("dropped replication slot \"%s\" on publisher",
994 walrcv_clear_result(res);
998 /* Close the connection in case of failure */
999 walrcv_disconnect(wrconn);
1004 walrcv_disconnect(wrconn);
1008 heap_close(rel, NoLock);
1012 * Internal workhorse for changing a subscription owner
1015 AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1017 Form_pg_subscription form;
1019 form = (Form_pg_subscription) GETSTRUCT(tup);
1021 if (form->subowner == newOwnerId)
1024 if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1025 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1026 NameStr(form->subname));
1028 /* New owner must be a superuser */
1029 if (!superuser_arg(newOwnerId))
1031 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1032 errmsg("permission denied to change owner of subscription \"%s\"",
1033 NameStr(form->subname)),
1034 errhint("The owner of a subscription must be a superuser.")));
1036 form->subowner = newOwnerId;
1037 CatalogTupleUpdate(rel, &tup->t_self, tup);
1039 /* Update owner dependency reference */
1040 changeDependencyOnOwner(SubscriptionRelationId,
1041 HeapTupleGetOid(tup),
1044 InvokeObjectPostAlterHook(SubscriptionRelationId,
1045 HeapTupleGetOid(tup), 0);
1049 * Change subscription owner -- by name
1052 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1057 ObjectAddress address;
1059 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1061 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1062 CStringGetDatum(name));
1064 if (!HeapTupleIsValid(tup))
1066 (errcode(ERRCODE_UNDEFINED_OBJECT),
1067 errmsg("subscription \"%s\" does not exist", name)));
1069 subid = HeapTupleGetOid(tup);
1071 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1073 ObjectAddressSet(address, SubscriptionRelationId, subid);
1075 heap_freetuple(tup);
1077 heap_close(rel, RowExclusiveLock);
1083 * Change subscription owner -- by OID
1086 AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1091 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1093 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1095 if (!HeapTupleIsValid(tup))
1097 (errcode(ERRCODE_UNDEFINED_OBJECT),
1098 errmsg("subscription with OID %u does not exist", subid)));
1100 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1102 heap_freetuple(tup);
1104 heap_close(rel, RowExclusiveLock);
1108 * Get the list of tables which belong to specified publications on the
1109 * publisher connection.
1112 fetch_table_list(WalReceiverConn *wrconn, List *publications)
1114 WalRcvExecResult *res;
1116 TupleTableSlot *slot;
1117 Oid tableRow[2] = {TEXTOID, TEXTOID};
1120 List *tablelist = NIL;
1122 Assert(list_length(publications) > 0);
1124 initStringInfo(&cmd);
1125 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1126 " FROM pg_catalog.pg_publication_tables t\n"
1127 " WHERE t.pubname IN (");
1129 foreach(lc, publications)
1131 char *pubname = strVal(lfirst(lc));
1136 appendStringInfoString(&cmd, ", ");
1138 appendStringInfoString(&cmd, quote_literal_cstr(pubname));
1140 appendStringInfoChar(&cmd, ')');
1142 res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1145 if (res->status != WALRCV_OK_TUPLES)
1147 (errmsg("could not receive list of replicated tables from the publisher: %s",
1150 /* Process tables. */
1151 slot = MakeSingleTupleTableSlot(res->tupledesc);
1152 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1159 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1161 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1164 rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1165 tablelist = lappend(tablelist, rv);
1167 ExecClearTuple(slot);
1169 ExecDropSingleTupleTableSlot(slot);
1171 walrcv_clear_result(res);