]> granicus.if.org Git - postgresql/blob - src/backend/commands/subscriptioncmds.c
Rename TransactionChain functions
[postgresql] / src / backend / commands / subscriptioncmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * subscriptioncmds.c
4  *              subscription catalog manipulation functions
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *              subscriptioncmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
22
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
31
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/subscriptioncmds.h"
35
36 #include "executor/executor.h"
37
38 #include "nodes/makefuncs.h"
39
40 #include "replication/logicallauncher.h"
41 #include "replication/origin.h"
42 #include "replication/walreceiver.h"
43 #include "replication/walsender.h"
44 #include "replication/worker_internal.h"
45
46 #include "storage/lmgr.h"
47
48 #include "utils/builtins.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53
54 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
55
56 /*
57  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
58  *
59  * Since not all options can be specified in both commands, this function
60  * will report an error on options if the target output pointer is NULL to
61  * accommodate that.
62  */
63 static void
64 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
65                                                    bool *enabled, bool *create_slot,
66                                                    bool *slot_name_given, char **slot_name,
67                                                    bool *copy_data, char **synchronous_commit,
68                                                    bool *refresh)
69 {
70         ListCell   *lc;
71         bool            connect_given = false;
72         bool            create_slot_given = false;
73         bool            copy_data_given = false;
74         bool            refresh_given = false;
75
76         /* If connect is specified, the others also need to be. */
77         Assert(!connect || (enabled && create_slot && copy_data));
78
79         if (connect)
80                 *connect = true;
81         if (enabled)
82         {
83                 *enabled_given = false;
84                 *enabled = true;
85         }
86         if (create_slot)
87                 *create_slot = true;
88         if (slot_name)
89         {
90                 *slot_name_given = false;
91                 *slot_name = NULL;
92         }
93         if (copy_data)
94                 *copy_data = true;
95         if (synchronous_commit)
96                 *synchronous_commit = NULL;
97         if (refresh)
98                 *refresh = true;
99
100         /* Parse options */
101         foreach(lc, options)
102         {
103                 DefElem    *defel = (DefElem *) lfirst(lc);
104
105                 if (strcmp(defel->defname, "connect") == 0 && connect)
106                 {
107                         if (connect_given)
108                                 ereport(ERROR,
109                                                 (errcode(ERRCODE_SYNTAX_ERROR),
110                                                  errmsg("conflicting or redundant options")));
111
112                         connect_given = true;
113                         *connect = defGetBoolean(defel);
114                 }
115                 else if (strcmp(defel->defname, "enabled") == 0 && enabled)
116                 {
117                         if (*enabled_given)
118                                 ereport(ERROR,
119                                                 (errcode(ERRCODE_SYNTAX_ERROR),
120                                                  errmsg("conflicting or redundant options")));
121
122                         *enabled_given = true;
123                         *enabled = defGetBoolean(defel);
124                 }
125                 else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
126                 {
127                         if (create_slot_given)
128                                 ereport(ERROR,
129                                                 (errcode(ERRCODE_SYNTAX_ERROR),
130                                                  errmsg("conflicting or redundant options")));
131
132                         create_slot_given = true;
133                         *create_slot = defGetBoolean(defel);
134                 }
135                 else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
136                 {
137                         if (*slot_name_given)
138                                 ereport(ERROR,
139                                                 (errcode(ERRCODE_SYNTAX_ERROR),
140                                                  errmsg("conflicting or redundant options")));
141
142                         *slot_name_given = true;
143                         *slot_name = defGetString(defel);
144
145                         /* Setting slot_name = NONE is treated as no slot name. */
146                         if (strcmp(*slot_name, "none") == 0)
147                                 *slot_name = NULL;
148                 }
149                 else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
150                 {
151                         if (copy_data_given)
152                                 ereport(ERROR,
153                                                 (errcode(ERRCODE_SYNTAX_ERROR),
154                                                  errmsg("conflicting or redundant options")));
155
156                         copy_data_given = true;
157                         *copy_data = defGetBoolean(defel);
158                 }
159                 else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
160                                  synchronous_commit)
161                 {
162                         if (*synchronous_commit)
163                                 ereport(ERROR,
164                                                 (errcode(ERRCODE_SYNTAX_ERROR),
165                                                  errmsg("conflicting or redundant options")));
166
167                         *synchronous_commit = defGetString(defel);
168
169                         /* Test if the given value is valid for synchronous_commit GUC. */
170                         (void) set_config_option("synchronous_commit", *synchronous_commit,
171                                                                          PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
172                                                                          false, 0, false);
173                 }
174                 else if (strcmp(defel->defname, "refresh") == 0 && refresh)
175                 {
176                         if (refresh_given)
177                                 ereport(ERROR,
178                                                 (errcode(ERRCODE_SYNTAX_ERROR),
179                                                  errmsg("conflicting or redundant options")));
180
181                         refresh_given = true;
182                         *refresh = defGetBoolean(defel);
183                 }
184                 else
185                         ereport(ERROR,
186                                         (errcode(ERRCODE_SYNTAX_ERROR),
187                                          errmsg("unrecognized subscription parameter: %s", defel->defname)));
188         }
189
190         /*
191          * We've been explicitly asked to not connect, that requires some
192          * additional processing.
193          */
194         if (connect && !*connect)
195         {
196                 /* Check for incompatible options from the user. */
197                 if (enabled && *enabled_given && *enabled)
198                         ereport(ERROR,
199                                         (errcode(ERRCODE_SYNTAX_ERROR),
200                                          errmsg("connect = false and enabled = true are mutually exclusive options")));
201
202                 if (create_slot && create_slot_given && *create_slot)
203                         ereport(ERROR,
204                                         (errcode(ERRCODE_SYNTAX_ERROR),
205                                          errmsg("connect = false and create_slot = true are mutually exclusive options")));
206
207                 if (copy_data && copy_data_given && *copy_data)
208                         ereport(ERROR,
209                                         (errcode(ERRCODE_SYNTAX_ERROR),
210                                          errmsg("connect = false and copy_data = true are mutually exclusive options")));
211
212                 /* Change the defaults of other options. */
213                 *enabled = false;
214                 *create_slot = false;
215                 *copy_data = false;
216         }
217
218         /*
219          * Do additional checking for disallowed combination when slot_name = NONE
220          * was used.
221          */
222         if (slot_name && *slot_name_given && !*slot_name)
223         {
224                 if (enabled && *enabled_given && *enabled)
225                         ereport(ERROR,
226                                         (errcode(ERRCODE_SYNTAX_ERROR),
227                                          errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
228
229                 if (create_slot && create_slot_given && *create_slot)
230                         ereport(ERROR,
231                                         (errcode(ERRCODE_SYNTAX_ERROR),
232                                          errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
233
234                 if (enabled && !*enabled_given && *enabled)
235                         ereport(ERROR,
236                                         (errcode(ERRCODE_SYNTAX_ERROR),
237                                          errmsg("subscription with slot_name = NONE must also set enabled = false")));
238
239                 if (create_slot && !create_slot_given && *create_slot)
240                         ereport(ERROR,
241                                         (errcode(ERRCODE_SYNTAX_ERROR),
242                                          errmsg("subscription with slot_name = NONE must also set create_slot = false")));
243         }
244 }
245
246 /*
247  * Auxiliary function to build a text array out of a list of String nodes.
248  */
249 static Datum
250 publicationListToArray(List *publist)
251 {
252         ArrayType  *arr;
253         Datum      *datums;
254         int                     j = 0;
255         ListCell   *cell;
256         MemoryContext memcxt;
257         MemoryContext oldcxt;
258
259         /* Create memory context for temporary allocations. */
260         memcxt = AllocSetContextCreate(CurrentMemoryContext,
261                                                                    "publicationListToArray to array",
262                                                                    ALLOCSET_DEFAULT_SIZES);
263         oldcxt = MemoryContextSwitchTo(memcxt);
264
265         datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
266
267         foreach(cell, publist)
268         {
269                 char       *name = strVal(lfirst(cell));
270                 ListCell   *pcell;
271
272                 /* Check for duplicates. */
273                 foreach(pcell, publist)
274                 {
275                         char       *pname = strVal(lfirst(pcell));
276
277                         if (pcell == cell)
278                                 break;
279
280                         if (strcmp(name, pname) == 0)
281                                 ereport(ERROR,
282                                                 (errcode(ERRCODE_SYNTAX_ERROR),
283                                                  errmsg("publication name \"%s\" used more than once",
284                                                                 pname)));
285                 }
286
287                 datums[j++] = CStringGetTextDatum(name);
288         }
289
290         MemoryContextSwitchTo(oldcxt);
291
292         arr = construct_array(datums, list_length(publist),
293                                                   TEXTOID, -1, false, 'i');
294
295         MemoryContextDelete(memcxt);
296
297         return PointerGetDatum(arr);
298 }
299
300 /*
301  * Create new subscription.
302  */
303 ObjectAddress
304 CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
305 {
306         Relation        rel;
307         ObjectAddress myself;
308         Oid                     subid;
309         bool            nulls[Natts_pg_subscription];
310         Datum           values[Natts_pg_subscription];
311         Oid                     owner = GetUserId();
312         HeapTuple       tup;
313         bool            connect;
314         bool            enabled_given;
315         bool            enabled;
316         bool            copy_data;
317         char       *synchronous_commit;
318         char       *conninfo;
319         char       *slotname;
320         bool            slotname_given;
321         char            originname[NAMEDATALEN];
322         bool            create_slot;
323         List       *publications;
324
325         /*
326          * Parse and check options.
327          *
328          * Connection and publication should not be specified here.
329          */
330         parse_subscription_options(stmt->options, &connect, &enabled_given,
331                                                            &enabled, &create_slot, &slotname_given,
332                                                            &slotname, &copy_data, &synchronous_commit,
333                                                            NULL);
334
335         /*
336          * Since creating a replication slot is not transactional, rolling back
337          * the transaction leaves the created replication slot.  So we cannot run
338          * CREATE SUBSCRIPTION inside a transaction block if creating a
339          * replication slot.
340          */
341         if (create_slot)
342                 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
343
344         if (!superuser())
345                 ereport(ERROR,
346                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347                                  (errmsg("must be superuser to create subscriptions"))));
348
349         rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
350
351         /* Check if name is used */
352         subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
353                                                         CStringGetDatum(stmt->subname));
354         if (OidIsValid(subid))
355         {
356                 ereport(ERROR,
357                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
358                                  errmsg("subscription \"%s\" already exists",
359                                                 stmt->subname)));
360         }
361
362         if (!slotname_given && slotname == NULL)
363                 slotname = stmt->subname;
364
365         /* The default for synchronous_commit of subscriptions is off. */
366         if (synchronous_commit == NULL)
367                 synchronous_commit = "off";
368
369         conninfo = stmt->conninfo;
370         publications = stmt->publication;
371
372         /* Load the library providing us libpq calls. */
373         load_file("libpqwalreceiver", false);
374
375         /* Check the connection info string. */
376         walrcv_check_conninfo(conninfo);
377
378         /* Everything ok, form a new tuple. */
379         memset(values, 0, sizeof(values));
380         memset(nulls, false, sizeof(nulls));
381
382         values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
383         values[Anum_pg_subscription_subname - 1] =
384                 DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
385         values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
386         values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
387         values[Anum_pg_subscription_subconninfo - 1] =
388                 CStringGetTextDatum(conninfo);
389         if (slotname)
390                 values[Anum_pg_subscription_subslotname - 1] =
391                         DirectFunctionCall1(namein, CStringGetDatum(slotname));
392         else
393                 nulls[Anum_pg_subscription_subslotname - 1] = true;
394         values[Anum_pg_subscription_subsynccommit - 1] =
395                 CStringGetTextDatum(synchronous_commit);
396         values[Anum_pg_subscription_subpublications - 1] =
397                 publicationListToArray(publications);
398
399         tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
400
401         /* Insert tuple into catalog. */
402         subid = CatalogTupleInsert(rel, tup);
403         heap_freetuple(tup);
404
405         recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
406
407         snprintf(originname, sizeof(originname), "pg_%u", subid);
408         replorigin_create(originname);
409
410         /*
411          * Connect to remote side to execute requested commands and fetch table
412          * info.
413          */
414         if (connect)
415         {
416                 XLogRecPtr      lsn;
417                 char       *err;
418                 WalReceiverConn *wrconn;
419                 List       *tables;
420                 ListCell   *lc;
421                 char            table_state;
422
423                 /* Try to connect to the publisher. */
424                 wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
425                 if (!wrconn)
426                         ereport(ERROR,
427                                         (errmsg("could not connect to the publisher: %s", err)));
428
429                 PG_TRY();
430                 {
431                         /*
432                          * Set sync state based on if we were asked to do data copy or
433                          * not.
434                          */
435                         table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
436
437                         /*
438                          * Get the table list from publisher and build local table status
439                          * info.
440                          */
441                         tables = fetch_table_list(wrconn, publications);
442                         foreach(lc, tables)
443                         {
444                                 RangeVar   *rv = (RangeVar *) lfirst(lc);
445                                 Oid                     relid;
446
447                                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
448
449                                 /* Check for supported relkind. */
450                                 CheckSubscriptionRelkind(get_rel_relkind(relid),
451                                                                                  rv->schemaname, rv->relname);
452
453                                 SetSubscriptionRelState(subid, relid, table_state,
454                                                                                 InvalidXLogRecPtr, false);
455                         }
456
457                         /*
458                          * If requested, create permanent slot for the subscription. We
459                          * won't use the initial snapshot for anything, so no need to
460                          * export it.
461                          */
462                         if (create_slot)
463                         {
464                                 Assert(slotname);
465
466                                 walrcv_create_slot(wrconn, slotname, false,
467                                                                    CRS_NOEXPORT_SNAPSHOT, &lsn);
468                                 ereport(NOTICE,
469                                                 (errmsg("created replication slot \"%s\" on publisher",
470                                                                 slotname)));
471                         }
472                 }
473                 PG_CATCH();
474                 {
475                         /* Close the connection in case of failure. */
476                         walrcv_disconnect(wrconn);
477                         PG_RE_THROW();
478                 }
479                 PG_END_TRY();
480
481                 /* And we are done with the remote side. */
482                 walrcv_disconnect(wrconn);
483         }
484         else
485                 ereport(WARNING,
486                                 (errmsg("tables were not subscribed, you will have to run "
487                                                 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
488                                                 "subscribe the tables")));
489
490         heap_close(rel, RowExclusiveLock);
491
492         if (enabled)
493                 ApplyLauncherWakeupAtCommit();
494
495         ObjectAddressSet(myself, SubscriptionRelationId, subid);
496
497         InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
498
499         return myself;
500 }
501
502 static void
503 AlterSubscription_refresh(Subscription *sub, bool copy_data)
504 {
505         char       *err;
506         List       *pubrel_names;
507         List       *subrel_states;
508         Oid                *subrel_local_oids;
509         Oid                *pubrel_local_oids;
510         ListCell   *lc;
511         int                     off;
512
513         /* Load the library providing us libpq calls. */
514         load_file("libpqwalreceiver", false);
515
516         /* Try to connect to the publisher. */
517         wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
518         if (!wrconn)
519                 ereport(ERROR,
520                                 (errmsg("could not connect to the publisher: %s", err)));
521
522         /* Get the table list from publisher. */
523         pubrel_names = fetch_table_list(wrconn, sub->publications);
524
525         /* We are done with the remote side, close connection. */
526         walrcv_disconnect(wrconn);
527
528         /* Get local table list. */
529         subrel_states = GetSubscriptionRelations(sub->oid);
530
531         /*
532          * Build qsorted array of local table oids for faster lookup. This can
533          * potentially contain all tables in the database so speed of lookup is
534          * important.
535          */
536         subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
537         off = 0;
538         foreach(lc, subrel_states)
539         {
540                 SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
541
542                 subrel_local_oids[off++] = relstate->relid;
543         }
544         qsort(subrel_local_oids, list_length(subrel_states),
545                   sizeof(Oid), oid_cmp);
546
547         /*
548          * Walk over the remote tables and try to match them to locally known
549          * tables. If the table is not known locally create a new state for it.
550          *
551          * Also builds array of local oids of remote tables for the next step.
552          */
553         off = 0;
554         pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
555
556         foreach(lc, pubrel_names)
557         {
558                 RangeVar   *rv = (RangeVar *) lfirst(lc);
559                 Oid                     relid;
560
561                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
562
563                 /* Check for supported relkind. */
564                 CheckSubscriptionRelkind(get_rel_relkind(relid),
565                                                                  rv->schemaname, rv->relname);
566
567                 pubrel_local_oids[off++] = relid;
568
569                 if (!bsearch(&relid, subrel_local_oids,
570                                          list_length(subrel_states), sizeof(Oid), oid_cmp))
571                 {
572                         SetSubscriptionRelState(sub->oid, relid,
573                                                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
574                                                                         InvalidXLogRecPtr, false);
575                         ereport(DEBUG1,
576                                         (errmsg("table \"%s.%s\" added to subscription \"%s\"",
577                                                         rv->schemaname, rv->relname, sub->name)));
578                 }
579         }
580
581         /*
582          * Next remove state for tables we should not care about anymore using the
583          * data we collected above
584          */
585         qsort(pubrel_local_oids, list_length(pubrel_names),
586                   sizeof(Oid), oid_cmp);
587
588         for (off = 0; off < list_length(subrel_states); off++)
589         {
590                 Oid                     relid = subrel_local_oids[off];
591
592                 if (!bsearch(&relid, pubrel_local_oids,
593                                          list_length(pubrel_names), sizeof(Oid), oid_cmp))
594                 {
595                         RemoveSubscriptionRel(sub->oid, relid);
596
597                         logicalrep_worker_stop_at_commit(sub->oid, relid);
598
599                         ereport(DEBUG1,
600                                         (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
601                                                         get_namespace_name(get_rel_namespace(relid)),
602                                                         get_rel_name(relid),
603                                                         sub->name)));
604                 }
605         }
606 }
607
608 /*
609  * Alter the existing subscription.
610  */
611 ObjectAddress
612 AlterSubscription(AlterSubscriptionStmt *stmt)
613 {
614         Relation        rel;
615         ObjectAddress myself;
616         bool            nulls[Natts_pg_subscription];
617         bool            replaces[Natts_pg_subscription];
618         Datum           values[Natts_pg_subscription];
619         HeapTuple       tup;
620         Oid                     subid;
621         bool            update_tuple = false;
622         Subscription *sub;
623
624         rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
625
626         /* Fetch the existing tuple. */
627         tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
628                                                           CStringGetDatum(stmt->subname));
629
630         if (!HeapTupleIsValid(tup))
631                 ereport(ERROR,
632                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
633                                  errmsg("subscription \"%s\" does not exist",
634                                                 stmt->subname)));
635
636         /* must be owner */
637         if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
638                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
639                                            stmt->subname);
640
641         subid = HeapTupleGetOid(tup);
642         sub = GetSubscription(subid, false);
643
644         /* Lock the subscription so nobody else can do anything with it. */
645         LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
646
647         /* Form a new tuple. */
648         memset(values, 0, sizeof(values));
649         memset(nulls, false, sizeof(nulls));
650         memset(replaces, false, sizeof(replaces));
651
652         switch (stmt->kind)
653         {
654                 case ALTER_SUBSCRIPTION_OPTIONS:
655                         {
656                                 char       *slotname;
657                                 bool            slotname_given;
658                                 char       *synchronous_commit;
659
660                                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
661                                                                                    NULL, &slotname_given, &slotname,
662                                                                                    NULL, &synchronous_commit, NULL);
663
664                                 if (slotname_given)
665                                 {
666                                         if (sub->enabled && !slotname)
667                                                 ereport(ERROR,
668                                                                 (errcode(ERRCODE_SYNTAX_ERROR),
669                                                                  errmsg("cannot set slot_name = NONE for enabled subscription")));
670
671                                         if (slotname)
672                                                 values[Anum_pg_subscription_subslotname - 1] =
673                                                         DirectFunctionCall1(namein, CStringGetDatum(slotname));
674                                         else
675                                                 nulls[Anum_pg_subscription_subslotname - 1] = true;
676                                         replaces[Anum_pg_subscription_subslotname - 1] = true;
677                                 }
678
679                                 if (synchronous_commit)
680                                 {
681                                         values[Anum_pg_subscription_subsynccommit - 1] =
682                                                 CStringGetTextDatum(synchronous_commit);
683                                         replaces[Anum_pg_subscription_subsynccommit - 1] = true;
684                                 }
685
686                                 update_tuple = true;
687                                 break;
688                         }
689
690                 case ALTER_SUBSCRIPTION_ENABLED:
691                         {
692                                 bool            enabled,
693                                                         enabled_given;
694
695                                 parse_subscription_options(stmt->options, NULL,
696                                                                                    &enabled_given, &enabled, NULL,
697                                                                                    NULL, NULL, NULL, NULL, NULL);
698                                 Assert(enabled_given);
699
700                                 if (!sub->slotname && enabled)
701                                         ereport(ERROR,
702                                                         (errcode(ERRCODE_SYNTAX_ERROR),
703                                                          errmsg("cannot enable subscription that does not have a slot name")));
704
705                                 values[Anum_pg_subscription_subenabled - 1] =
706                                         BoolGetDatum(enabled);
707                                 replaces[Anum_pg_subscription_subenabled - 1] = true;
708
709                                 if (enabled)
710                                         ApplyLauncherWakeupAtCommit();
711
712                                 update_tuple = true;
713                                 break;
714                         }
715
716                 case ALTER_SUBSCRIPTION_CONNECTION:
717                         /* Load the library providing us libpq calls. */
718                         load_file("libpqwalreceiver", false);
719                         /* Check the connection info string. */
720                         walrcv_check_conninfo(stmt->conninfo);
721
722                         values[Anum_pg_subscription_subconninfo - 1] =
723                                 CStringGetTextDatum(stmt->conninfo);
724                         replaces[Anum_pg_subscription_subconninfo - 1] = true;
725                         update_tuple = true;
726                         break;
727
728                 case ALTER_SUBSCRIPTION_PUBLICATION:
729                         {
730                                 bool            copy_data;
731                                 bool            refresh;
732
733                                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
734                                                                                    NULL, NULL, NULL, &copy_data,
735                                                                                    NULL, &refresh);
736
737                                 values[Anum_pg_subscription_subpublications - 1] =
738                                         publicationListToArray(stmt->publication);
739                                 replaces[Anum_pg_subscription_subpublications - 1] = true;
740
741                                 update_tuple = true;
742
743                                 /* Refresh if user asked us to. */
744                                 if (refresh)
745                                 {
746                                         if (!sub->enabled)
747                                                 ereport(ERROR,
748                                                                 (errcode(ERRCODE_SYNTAX_ERROR),
749                                                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
750                                                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
751
752                                         /* Make sure refresh sees the new list of publications. */
753                                         sub->publications = stmt->publication;
754
755                                         AlterSubscription_refresh(sub, copy_data);
756                                 }
757
758                                 break;
759                         }
760
761                 case ALTER_SUBSCRIPTION_REFRESH:
762                         {
763                                 bool            copy_data;
764
765                                 if (!sub->enabled)
766                                         ereport(ERROR,
767                                                         (errcode(ERRCODE_SYNTAX_ERROR),
768                                                          errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
769
770                                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
771                                                                                    NULL, NULL, NULL, &copy_data,
772                                                                                    NULL, NULL);
773
774                                 AlterSubscription_refresh(sub, copy_data);
775
776                                 break;
777                         }
778
779                 default:
780                         elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
781                                  stmt->kind);
782         }
783
784         /* Update the catalog if needed. */
785         if (update_tuple)
786         {
787                 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
788                                                                 replaces);
789
790                 CatalogTupleUpdate(rel, &tup->t_self, tup);
791
792                 heap_freetuple(tup);
793         }
794
795         heap_close(rel, RowExclusiveLock);
796
797         ObjectAddressSet(myself, SubscriptionRelationId, subid);
798
799         InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
800
801         return myself;
802 }
803
804 /*
805  * Drop a subscription
806  */
807 void
808 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
809 {
810         Relation        rel;
811         ObjectAddress myself;
812         HeapTuple       tup;
813         Oid                     subid;
814         Datum           datum;
815         bool            isnull;
816         char       *subname;
817         char       *conninfo;
818         char       *slotname;
819         List       *subworkers;
820         ListCell   *lc;
821         char            originname[NAMEDATALEN];
822         char       *err = NULL;
823         RepOriginId originid;
824         WalReceiverConn *wrconn = NULL;
825         StringInfoData cmd;
826
827         /*
828          * Lock pg_subscription with AccessExclusiveLock to ensure that the
829          * launcher doesn't restart new worker during dropping the subscription
830          */
831         rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
832
833         tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
834                                                   CStringGetDatum(stmt->subname));
835
836         if (!HeapTupleIsValid(tup))
837         {
838                 heap_close(rel, NoLock);
839
840                 if (!stmt->missing_ok)
841                         ereport(ERROR,
842                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
843                                          errmsg("subscription \"%s\" does not exist",
844                                                         stmt->subname)));
845                 else
846                         ereport(NOTICE,
847                                         (errmsg("subscription \"%s\" does not exist, skipping",
848                                                         stmt->subname)));
849
850                 return;
851         }
852
853         subid = HeapTupleGetOid(tup);
854
855         /* must be owner */
856         if (!pg_subscription_ownercheck(subid, GetUserId()))
857                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
858                                            stmt->subname);
859
860         /* DROP hook for the subscription being removed */
861         InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
862
863         /*
864          * Lock the subscription so nobody else can do anything with it (including
865          * the replication workers).
866          */
867         LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
868
869         /* Get subname */
870         datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
871                                                         Anum_pg_subscription_subname, &isnull);
872         Assert(!isnull);
873         subname = pstrdup(NameStr(*DatumGetName(datum)));
874
875         /* Get conninfo */
876         datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
877                                                         Anum_pg_subscription_subconninfo, &isnull);
878         Assert(!isnull);
879         conninfo = TextDatumGetCString(datum);
880
881         /* Get slotname */
882         datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
883                                                         Anum_pg_subscription_subslotname, &isnull);
884         if (!isnull)
885                 slotname = pstrdup(NameStr(*DatumGetName(datum)));
886         else
887                 slotname = NULL;
888
889         /*
890          * Since dropping a replication slot is not transactional, the replication
891          * slot stays dropped even if the transaction rolls back.  So we cannot
892          * run DROP SUBSCRIPTION inside a transaction block if dropping the
893          * replication slot.
894          *
895          * XXX The command name should really be something like "DROP SUBSCRIPTION
896          * of a subscription that is associated with a replication slot", but we
897          * don't have the proper facilities for that.
898          */
899         if (slotname)
900                 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
901
902
903         ObjectAddressSet(myself, SubscriptionRelationId, subid);
904         EventTriggerSQLDropAddObject(&myself, true, true);
905
906         /* Remove the tuple from catalog. */
907         CatalogTupleDelete(rel, &tup->t_self);
908
909         ReleaseSysCache(tup);
910
911         /*
912          * Stop all the subscription workers immediately.
913          *
914          * This is necessary if we are dropping the replication slot, so that the
915          * slot becomes accessible.
916          *
917          * It is also necessary if the subscription is disabled and was disabled
918          * in the same transaction.  Then the workers haven't seen the disabling
919          * yet and will still be running, leading to hangs later when we want to
920          * drop the replication origin.  If the subscription was disabled before
921          * this transaction, then there shouldn't be any workers left, so this
922          * won't make a difference.
923          *
924          * New workers won't be started because we hold an exclusive lock on the
925          * subscription till the end of the transaction.
926          */
927         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
928         subworkers = logicalrep_workers_find(subid, false);
929         LWLockRelease(LogicalRepWorkerLock);
930         foreach(lc, subworkers)
931         {
932                 LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
933
934                 logicalrep_worker_stop(w->subid, w->relid);
935         }
936         list_free(subworkers);
937
938         /* Clean up dependencies */
939         deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
940
941         /* Remove any associated relation synchronization states. */
942         RemoveSubscriptionRel(subid, InvalidOid);
943
944         /* Remove the origin tracking if exists. */
945         snprintf(originname, sizeof(originname), "pg_%u", subid);
946         originid = replorigin_by_name(originname, true);
947         if (originid != InvalidRepOriginId)
948                 replorigin_drop(originid, false);
949
950         /*
951          * If there is no slot associated with the subscription, we can finish
952          * here.
953          */
954         if (!slotname)
955         {
956                 heap_close(rel, NoLock);
957                 return;
958         }
959
960         /*
961          * Otherwise drop the replication slot at the publisher node using the
962          * replication connection.
963          */
964         load_file("libpqwalreceiver", false);
965
966         initStringInfo(&cmd);
967         appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
968
969         wrconn = walrcv_connect(conninfo, true, subname, &err);
970         if (wrconn == NULL)
971                 ereport(ERROR,
972                                 (errmsg("could not connect to publisher when attempting to "
973                                                 "drop the replication slot \"%s\"", slotname),
974                                  errdetail("The error was: %s", err),
975                                  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
976                                                  "to disassociate the subscription from the slot.")));
977
978         PG_TRY();
979         {
980                 WalRcvExecResult *res;
981
982                 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
983
984                 if (res->status != WALRCV_OK_COMMAND)
985                         ereport(ERROR,
986                                         (errmsg("could not drop the replication slot \"%s\" on publisher",
987                                                         slotname),
988                                          errdetail("The error was: %s", res->err)));
989                 else
990                         ereport(NOTICE,
991                                         (errmsg("dropped replication slot \"%s\" on publisher",
992                                                         slotname)));
993
994                 walrcv_clear_result(res);
995         }
996         PG_CATCH();
997         {
998                 /* Close the connection in case of failure */
999                 walrcv_disconnect(wrconn);
1000                 PG_RE_THROW();
1001         }
1002         PG_END_TRY();
1003
1004         walrcv_disconnect(wrconn);
1005
1006         pfree(cmd.data);
1007
1008         heap_close(rel, NoLock);
1009 }
1010
1011 /*
1012  * Internal workhorse for changing a subscription owner
1013  */
1014 static void
1015 AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1016 {
1017         Form_pg_subscription form;
1018
1019         form = (Form_pg_subscription) GETSTRUCT(tup);
1020
1021         if (form->subowner == newOwnerId)
1022                 return;
1023
1024         if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1025                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1026                                            NameStr(form->subname));
1027
1028         /* New owner must be a superuser */
1029         if (!superuser_arg(newOwnerId))
1030                 ereport(ERROR,
1031                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1032                                  errmsg("permission denied to change owner of subscription \"%s\"",
1033                                                 NameStr(form->subname)),
1034                                  errhint("The owner of a subscription must be a superuser.")));
1035
1036         form->subowner = newOwnerId;
1037         CatalogTupleUpdate(rel, &tup->t_self, tup);
1038
1039         /* Update owner dependency reference */
1040         changeDependencyOnOwner(SubscriptionRelationId,
1041                                                         HeapTupleGetOid(tup),
1042                                                         newOwnerId);
1043
1044         InvokeObjectPostAlterHook(SubscriptionRelationId,
1045                                                           HeapTupleGetOid(tup), 0);
1046 }
1047
1048 /*
1049  * Change subscription owner -- by name
1050  */
1051 ObjectAddress
1052 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1053 {
1054         Oid                     subid;
1055         HeapTuple       tup;
1056         Relation        rel;
1057         ObjectAddress address;
1058
1059         rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1060
1061         tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1062                                                           CStringGetDatum(name));
1063
1064         if (!HeapTupleIsValid(tup))
1065                 ereport(ERROR,
1066                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
1067                                  errmsg("subscription \"%s\" does not exist", name)));
1068
1069         subid = HeapTupleGetOid(tup);
1070
1071         AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1072
1073         ObjectAddressSet(address, SubscriptionRelationId, subid);
1074
1075         heap_freetuple(tup);
1076
1077         heap_close(rel, RowExclusiveLock);
1078
1079         return address;
1080 }
1081
1082 /*
1083  * Change subscription owner -- by OID
1084  */
1085 void
1086 AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1087 {
1088         HeapTuple       tup;
1089         Relation        rel;
1090
1091         rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1092
1093         tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1094
1095         if (!HeapTupleIsValid(tup))
1096                 ereport(ERROR,
1097                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
1098                                  errmsg("subscription with OID %u does not exist", subid)));
1099
1100         AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1101
1102         heap_freetuple(tup);
1103
1104         heap_close(rel, RowExclusiveLock);
1105 }
1106
1107 /*
1108  * Get the list of tables which belong to specified publications on the
1109  * publisher connection.
1110  */
1111 static List *
1112 fetch_table_list(WalReceiverConn *wrconn, List *publications)
1113 {
1114         WalRcvExecResult *res;
1115         StringInfoData cmd;
1116         TupleTableSlot *slot;
1117         Oid                     tableRow[2] = {TEXTOID, TEXTOID};
1118         ListCell   *lc;
1119         bool            first;
1120         List       *tablelist = NIL;
1121
1122         Assert(list_length(publications) > 0);
1123
1124         initStringInfo(&cmd);
1125         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1126                                                    "  FROM pg_catalog.pg_publication_tables t\n"
1127                                                    " WHERE t.pubname IN (");
1128         first = true;
1129         foreach(lc, publications)
1130         {
1131                 char       *pubname = strVal(lfirst(lc));
1132
1133                 if (first)
1134                         first = false;
1135                 else
1136                         appendStringInfoString(&cmd, ", ");
1137
1138                 appendStringInfoString(&cmd, quote_literal_cstr(pubname));
1139         }
1140         appendStringInfoChar(&cmd, ')');
1141
1142         res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1143         pfree(cmd.data);
1144
1145         if (res->status != WALRCV_OK_TUPLES)
1146                 ereport(ERROR,
1147                                 (errmsg("could not receive list of replicated tables from the publisher: %s",
1148                                                 res->err)));
1149
1150         /* Process tables. */
1151         slot = MakeSingleTupleTableSlot(res->tupledesc);
1152         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1153         {
1154                 char       *nspname;
1155                 char       *relname;
1156                 bool            isnull;
1157                 RangeVar   *rv;
1158
1159                 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1160                 Assert(!isnull);
1161                 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1162                 Assert(!isnull);
1163
1164                 rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1165                 tablelist = lappend(tablelist, rv);
1166
1167                 ExecClearTuple(slot);
1168         }
1169         ExecDropSingleTupleTableSlot(slot);
1170
1171         walrcv_clear_result(res);
1172
1173         return tablelist;
1174 }