/*
* RangeVarGetAndCheckCreationNamespace
- * As RangeVarGetCreationNamespace, but with a permissions check.
+ *
+ * This function returns the OID of the namespace in which a new relation
+ * with a given name should be created. If the user does not have CREATE
+ * permission on the target namespace, this function will instead signal
+ * an ERROR.
+ *
+ * If non-NULL, *existing_oid is set to the OID of any existing relation with
+ * the same name which already exists in that namespace, or to InvalidOid if
+ * no such relation exists.
+ *
+ * If lockmode != NoLock, the specified lock mode is acquire on the existing
+ * relation, if any, provided that the current user owns the target relation.
+ * However, if lockmode != NoLock and the user does not own the target
+ * relation, we throw an ERROR, as we must not try to lock relations the
+ * user does not have permissions on.
+ *
+ * As a side effect, this function acquires AccessShareLock on the target
+ * namespace. Without this, the namespace could be dropped before our
+ * transaction commits, leaving behind relations with relnamespace pointing
+ * to a no-longer-exstant namespace.
+ *
+ * As a further side-effect, if the select namespace is a temporary namespace,
+ * we mark the RangeVar as RELPERSISTENCE_TEMP.
*/
Oid
-RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
+RangeVarGetAndCheckCreationNamespace(RangeVar *relation,
+ LOCKMODE lockmode,
+ Oid *existing_relation_id)
{
- Oid namespaceId;
+ uint64 inval_count;
+ Oid relid;
+ Oid oldrelid = InvalidOid;
+ Oid nspid;
+ Oid oldnspid = InvalidOid;
+ bool retry = false;
- namespaceId = RangeVarGetCreationNamespace(newRelation);
+ /*
+ * We check the catalog name and then ignore it.
+ */
+ if (relation->catalogname)
+ {
+ if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
+ relation->catalogname, relation->schemaname,
+ relation->relname)));
+ }
/*
- * Check we have permission to create there. Skip check if bootstrapping,
- * since permissions machinery may not be working yet.
+ * As in RangeVarGetRelidExtended(), we guard against concurrent DDL
+ * operations by tracking whether any invalidation messages are processed
+ * while we're doing the name lookups and acquiring locks. See comments
+ * in that function for a more detailed explanation of this logic.
*/
- if (!IsBootstrapProcessingMode())
+ for (;;)
{
AclResult aclresult;
- aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
- ACL_CREATE);
+ inval_count = SharedInvalidMessageCounter;
+
+ /* Look up creation namespace and check for existing relation. */
+ nspid = RangeVarGetCreationNamespace(relation);
+ Assert(OidIsValid(nspid));
+ if (existing_relation_id != NULL)
+ relid = get_relname_relid(relation->relname, nspid);
+ else
+ relid = InvalidOid;
+
+ /*
+ * In bootstrap processing mode, we don't bother with permissions
+ * or locking. Permissions might not be working yet, and locking is
+ * unnecessary.
+ */
+ if (IsBootstrapProcessingMode())
+ break;
+
+ /* Check namespace permissions. */
+ aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
- get_namespace_name(namespaceId));
+ get_namespace_name(nspid));
+
+ if (retry)
+ {
+ /* If nothing changed, we're done. */
+ if (relid == oldrelid && nspid == oldnspid)
+ break;
+ /* If creation namespace has changed, give up old lock. */
+ if (nspid != oldnspid)
+ UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0,
+ AccessShareLock);
+ /* If name points to something different, give up old lock. */
+ if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock)
+ UnlockRelationOid(oldrelid, lockmode);
+ }
+
+ /* Lock namespace. */
+ if (nspid != oldnspid)
+ LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock);
+
+ /* Lock relation, if required if and we have permission. */
+ if (lockmode != NoLock && OidIsValid(relid))
+ {
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ relation->relname);
+ if (relid != oldrelid)
+ LockRelationOid(relid, lockmode);
+ }
+
+ /* If no invalidation message were processed, we're done! */
+ if (inval_count == SharedInvalidMessageCounter)
+ break;
+
+ /* Something may have changed, so recheck our work. */
+ retry = true;
+ oldrelid = relid;
+ oldnspid = nspid;
}
- return namespaceId;
+ RangeVarAdjustRelationPersistence(relation, nspid);
+ if (existing_relation_id != NULL)
+ *existing_relation_id = relid;
+ return nspid;
}
/*
/*
* Look up the namespace in which we are supposed to create the relation,
- * and check we have permission to create there.
+ * check we have permission to create there, lock it against concurrent
+ * drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary
+ * namespace is selected.
*/
- namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
- RangeVarAdjustRelationPersistence(stmt->relation, namespaceId);
+ namespaceId =
+ RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL);
/*
* Security check: disallow creating temp tables from security-restricted
Oid oldNspOid;
Oid nspOid;
Relation classRel;
+ RangeVar *newrv;
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
get_rel_name(tableId))));
}
- /* get schema OID and check its permissions */
- nspOid = LookupCreationNamespace(stmt->newschema);
+ /* Get and lock schema OID and check its permissions. */
+ newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
+ nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
* check is here mainly to get a better error message about a "type"
* instead of below about a "relation".
*/
- typeNamespace = RangeVarGetCreationNamespace(createStmt->relation);
+ typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation,
+ NoLock, NULL);
RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace);
old_type_oid =
GetSysCacheOid2(TYPENAMENSP,
*---------------------------------------------------------------------
*/
static Oid
-DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
- Oid namespaceId, List *options)
+DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace,
+ List *options)
{
Oid viewOid;
+ Oid namespaceId;
+ LOCKMODE lockmode;
CreateStmt *createStmt = makeNode(CreateStmt);
List *attrList;
ListCell *t;
errmsg("view must have at least one column")));
/*
- * Check to see if we want to replace an existing view.
+ * Look up, check permissions on, and lock the creation namespace; also
+ * check for a preexisting view with the same name. This will also set
+ * relation->relpersistence to RELPERSISTENCE_TEMP if the selected
+ * namespace is temporary.
*/
- viewOid = get_relname_relid(relation->relname, namespaceId);
+ lockmode = replace ? AccessExclusiveLock : NoLock;
+ namespaceId =
+ RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid);
if (OidIsValid(viewOid) && replace)
{
List *atcmds = NIL;
AlterTableCmd *atcmd;
- /*
- * Yes. Get exclusive lock on the existing view ...
- */
- rel = relation_open(viewOid, AccessExclusiveLock);
+ /* Relation is already locked, but we must build a relcache entry. */
+ rel = relation_open(viewOid, NoLock);
- /*
- * Make sure it *is* a view, and do permissions checks.
- */
+ /* Make sure it *is* a view. */
if (rel->rd_rel->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(rel))));
- if (!pg_class_ownercheck(viewOid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
/* Also check it's not in use already */
CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
{
Query *viewParse;
Oid viewOid;
- Oid namespaceId;
RangeVar *view;
/*
view->relname)));
}
- /* Might also need to make it temporary if placed in temp schema. */
- namespaceId = RangeVarGetCreationNamespace(view);
- RangeVarAdjustRelationPersistence(view, namespaceId);
-
/*
* Create the view relation
*
* aborted.
*/
viewOid = DefineVirtualRelation(view, viewParse->targetList,
- stmt->replace, namespaceId, stmt->options);
+ stmt->replace, stmt->options);
/*
* The relation we have just created is not visible to any other commands
}
/*
- * Find namespace to create in, check its permissions
+ * Find namespace to create in, check its permissions, lock it against
+ * concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the
+ * selected namespace is temporary.
*/
intoName = into->rel->relname;
- namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
- RangeVarAdjustRelationPersistence(into->rel, namespaceId);
+ namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock,
+ NULL);
/*
* Security check: disallow creating temp tables from security-restricted
List *save_alist;
ListCell *elements;
Oid namespaceid;
+ Oid existing_relid;
/*
* We must not scribble on the passed-in CreateStmt, so copy it. (This is
/*
* Look up the creation namespace. This also checks permissions on the
- * target namespace, so that we throw any permissions error as early as
- * possible.
+ * target namespace, locks it against concurrent drops, checks for a
+ * preexisting relation in that namespace with the same name, and updates
+ * stmt->relation->relpersistence if the select namespace is temporary.
*/
- namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
- RangeVarAdjustRelationPersistence(stmt->relation, namespaceid);
+ namespaceid =
+ RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock,
+ &existing_relid);
/*
* If the relation already exists and the user specified "IF NOT EXISTS",
* bail out with a NOTICE.
*/
- if (stmt->if_not_exists)
+ if (stmt->if_not_exists && OidIsValid(existing_relid))
{
- Oid existing_relid;
-
- existing_relid = get_relname_relid(stmt->relation->relname,
- namespaceid);
- if (existing_relid != InvalidOid)
- {
- ereport(NOTICE,
- (errcode(ERRCODE_DUPLICATE_TABLE),
- errmsg("relation \"%s\" already exists, skipping",
- stmt->relation->relname)));
- return NIL;
- }
+ ereport(NOTICE,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("relation \"%s\" already exists, skipping",
+ stmt->relation->relname)));
+ return NIL;
}
/*
RangeVarGetRelidCallback callback,
void *callback_arg);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
-extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
+extern Oid RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation,
+ LOCKMODE lockmode,
+ Oid *existing_relation_id);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
extern Oid RelnameGetRelid(const char *relname);
extern bool RelationIsVisible(Oid relid);