/*------------------------------------------------------------------------- * * publicationcmds.c * publication manipulation * * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * publicationcmds.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "funcapi.h" #include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits_fn.h" #include "catalog/pg_type.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/publicationcmds.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" /* Same as MAXNUMMESSAGES in sinvaladt.c */ #define MAX_RELCACHE_INVAL_MSGS 4096 static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); static void parse_publication_options(List *options, bool *publish_insert_given, bool *publish_insert, bool *publish_update_given, bool *publish_update, bool *publish_delete_given, bool *publish_delete) { ListCell *lc; *publish_insert_given = false; *publish_update_given = false; *publish_delete_given = false; /* Defaults are true */ *publish_insert = true; *publish_update = true; *publish_delete = true; /* Parse options */ foreach (lc, options) { DefElem *defel = (DefElem *) lfirst(lc); if (strcmp(defel->defname, "publish insert") == 0) { if (*publish_insert_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_insert_given = true; *publish_insert = defGetBoolean(defel); } else if (strcmp(defel->defname, "nopublish insert") == 0) { if (*publish_insert_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_insert_given = true; *publish_insert = !defGetBoolean(defel); } else if (strcmp(defel->defname, "publish update") == 0) { if (*publish_update_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_update_given = true; *publish_update = defGetBoolean(defel); } else if (strcmp(defel->defname, "nopublish update") == 0) { if (*publish_update_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_update_given = true; *publish_update = !defGetBoolean(defel); } else if (strcmp(defel->defname, "publish delete") == 0) { if (*publish_delete_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_delete_given = true; *publish_delete = defGetBoolean(defel); } else if (strcmp(defel->defname, "nopublish delete") == 0) { if (*publish_delete_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); *publish_delete_given = true; *publish_delete = !defGetBoolean(defel); } else elog(ERROR, "unrecognized option: %s", defel->defname); } } /* * Create new publication. */ ObjectAddress CreatePublication(CreatePublicationStmt *stmt) { Relation rel; ObjectAddress myself; Oid puboid; bool nulls[Natts_pg_publication]; Datum values[Natts_pg_publication]; HeapTuple tup; bool publish_insert_given; bool publish_update_given; bool publish_delete_given; bool publish_insert; bool publish_update; bool publish_delete; AclResult aclresult; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_DATABASE, get_database_name(MyDatabaseId)); /* FOR ALL TABLES requires superuser */ if (stmt->for_all_tables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to create FOR ALL TABLES publication")))); rel = heap_open(PublicationRelationId, RowExclusiveLock); /* Check if name is used */ puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname)); if (OidIsValid(puboid)) { ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("publication \"%s\" already exists", stmt->pubname))); } /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); values[Anum_pg_publication_pubname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname)); values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId()); parse_publication_options(stmt->options, &publish_insert_given, &publish_insert, &publish_update_given, &publish_update, &publish_delete_given, &publish_delete); values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert); values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update); values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ puboid = CatalogTupleInsert(rel, tup); heap_freetuple(tup); recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId()); ObjectAddressSet(myself, PublicationRelationId, puboid); /* Make the changes visible. */ CommandCounterIncrement(); if (stmt->tables) { List *rels; Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); PublicationAddTables(puboid, rels, true, NULL); CloseTableList(rels); } heap_close(rel, RowExclusiveLock); InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); return myself; } /* * Change options of a publication. */ static void AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup) { bool nulls[Natts_pg_publication]; bool replaces[Natts_pg_publication]; Datum values[Natts_pg_publication]; bool publish_insert_given; bool publish_update_given; bool publish_delete_given; bool publish_insert; bool publish_update; bool publish_delete; ObjectAddress obj; parse_publication_options(stmt->options, &publish_insert_given, &publish_insert, &publish_update_given, &publish_update, &publish_delete_given, &publish_delete); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); if (publish_insert_given) { values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert); replaces[Anum_pg_publication_pubinsert - 1] = true; } if (publish_update_given) { values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update); replaces[Anum_pg_publication_pubupdate - 1] = true; } if (publish_delete_given) { values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); replaces[Anum_pg_publication_pubdelete - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); /* Update the catalog. */ CatalogTupleUpdate(rel, &tup->t_self, tup); CommandCounterIncrement(); /* Invalidate the relcache. */ if (((Form_pg_publication) GETSTRUCT(tup))->puballtables) { CacheInvalidateRelcacheAll(); } else { List *relids = GetPublicationRelations(HeapTupleGetOid(tup)); /* * We don't want to send too many individual messages, at some point * it's cheaper to just reset whole relcache. */ if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS) { ListCell *lc; foreach (lc, relids) { Oid relid = lfirst_oid(lc); CacheInvalidateRelcacheByRelid(relid); } } else CacheInvalidateRelcacheAll(); } ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup)); EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, (Node *) stmt); InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0); } /* * Add or remove table to/from publication. */ static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup) { Oid pubid = HeapTupleGetOid(tup); List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); /* Check that user is allowed to manipulate the publication tables. */ if (pubform->puballtables) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) PublicationAddTables(pubid, rels, false, stmt); else if (stmt->tableAction == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { List *oldrelids = GetPublicationRelations(pubid); List *delrels = NIL; ListCell *oldlc; /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { Oid oldrelid = lfirst_oid(oldlc); ListCell *newlc; bool found = false; foreach(newlc, rels) { Relation newrel = (Relation) lfirst(newlc); if (RelationGetRelid(newrel) == oldrelid) { found = true; break; } } if (!found) { Relation oldrel = heap_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); } } /* And drop them. */ PublicationDropTables(pubid, delrels, true); /* * Don't bother calculating the difference for adding, we'll catch * and skip existing ones when doing catalog update. */ PublicationAddTables(pubid, rels, true, stmt); CloseTableList(delrels); } CloseTableList(rels); } /* * Alter the existing publication. * * This is dispatcher function for AlterPublicationOptions and * AlterPublicationTables. */ void AlterPublication(AlterPublicationStmt *stmt) { Relation rel; HeapTuple tup; rel = heap_open(PublicationRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname)); if (!HeapTupleIsValid(tup)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication \"%s\" does not exist", stmt->pubname))); /* must be owner */ if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION, stmt->pubname); if (stmt->options) AlterPublicationOptions(stmt, rel, tup); else AlterPublicationTables(stmt, rel, tup); /* Cleanup. */ heap_freetuple(tup); heap_close(rel, RowExclusiveLock); } /* * Drop publication by OID */ void RemovePublicationById(Oid pubid) { Relation rel; HeapTuple tup; rel = heap_open(PublicationRelationId, RowExclusiveLock); tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for publication %u", pubid); CatalogTupleDelete(rel, &tup->t_self); ReleaseSysCache(tup); heap_close(rel, RowExclusiveLock); } /* * Remove relation from publication by mapping OID. */ void RemovePublicationRelById(Oid proid) { Relation rel; HeapTuple tup; Form_pg_publication_rel pubrel; rel = heap_open(PublicationRelRelationId, RowExclusiveLock); tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for publication table %u", proid); pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheByRelid(pubrel->prrelid); CatalogTupleDelete(rel, &tup->t_self); ReleaseSysCache(tup); heap_close(rel, RowExclusiveLock); } /* * Open relations based on provided by RangeVar list. * The returned tables are locked in ShareUpdateExclusiveLock mode. */ static List * OpenTableList(List *tables) { List *relids = NIL; List *rels = NIL; ListCell *lc; /* * Open, share-lock, and check all the explicitly-specified relations */ foreach(lc, tables) { RangeVar *rv = lfirst(lc); Relation rel; bool recurse = rv->inh; Oid myrelid; CHECK_FOR_INTERRUPTS(); rel = heap_openrv(rv, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); /* * filter out duplicates when user specifies "foo, foo" * Note that this algorithm is known to not be very efficient (O(N^2)) * but given that it only works on list of tables given to us by user * it's deemed acceptable. */ if (list_member_oid(relids, myrelid)) { heap_close(rel, ShareUpdateExclusiveLock); continue; } rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); if (recurse) { ListCell *child; List *children; children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock, NULL); foreach(child, children) { Oid childrelid = lfirst_oid(child); if (list_member_oid(relids, childrelid)) continue; /* * Skip duplicates if user specified both parent and child * tables. */ if (list_member_oid(relids, childrelid)) { heap_close(rel, ShareUpdateExclusiveLock); continue; } /* find_all_inheritors already got lock */ rel = heap_open(childrelid, NoLock); rels = lappend(rels, rel); relids = lappend_oid(relids, childrelid); } } } list_free(relids); return rels; } /* * Close all relations in the list. */ static void CloseTableList(List *rels) { ListCell *lc; foreach(lc, rels) { Relation rel = (Relation) lfirst(lc); heap_close(rel, NoLock); } } /* * Add listed tables to the publication. */ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; Assert(!stmt || !stmt->for_all_tables); foreach(lc, rels) { Relation rel = (Relation) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(rel)); obj = publication_add_relation(pubid, rel, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, (Node *) stmt); InvokeObjectPostCreateHook(PublicationRelRelationId, obj.objectId, 0); } } } /* * Remove listed tables from the publication. */ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok) { ObjectAddress obj; ListCell *lc; Oid prid; foreach(lc, rels) { Relation rel = (Relation) lfirst(lc); Oid relid = RelationGetRelid(rel); prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); if (!OidIsValid(prid)) { if (missing_ok) continue; ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" is not part of the publication", RelationGetRelationName(rel)))); } ObjectAddressSet(obj, PublicationRelRelationId, prid); performDeletion(&obj, DROP_CASCADE, 0); } } /* * Internal workhorse for changing a publication owner */ static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) { Form_pg_publication form; form = (Form_pg_publication) GETSTRUCT(tup); if (form->pubowner == newOwnerId) return; if (!superuser()) { AclResult aclresult; /* Must be owner */ if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION, NameStr(form->pubname)); /* Must be able to become new owner */ check_is_member_of_role(GetUserId(), newOwnerId); /* New owner must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_DATABASE, get_database_name(MyDatabaseId)); if (form->puballtables && !superuser_arg(newOwnerId)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to change owner of publication \"%s\"", NameStr(form->pubname)), errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); } form->pubowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); /* Update owner dependency reference */ changeDependencyOnOwner(PublicationRelationId, HeapTupleGetOid(tup), newOwnerId); InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0); } /* * Change publication owner -- by name */ ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId) { Oid subid; HeapTuple tup; Relation rel; ObjectAddress address; rel = heap_open(PublicationRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name)); if (!HeapTupleIsValid(tup)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication \"%s\" does not exist", name))); subid = HeapTupleGetOid(tup); AlterPublicationOwner_internal(rel, tup, newOwnerId); ObjectAddressSet(address, PublicationRelationId, subid); heap_freetuple(tup); heap_close(rel, RowExclusiveLock); return address; } /* * Change publication owner -- by OID */ void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId) { HeapTuple tup; Relation rel; rel = heap_open(PublicationRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid)); if (!HeapTupleIsValid(tup)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication with OID %u does not exist", subid))); AlterPublicationOwner_internal(rel, tup, newOwnerId); heap_freetuple(tup); heap_close(rel, RowExclusiveLock); }