+/*
+ * RangeVarGetAndCheckCreationNamespace
+ *
+ * This function returns the OID of the namespace in which a new relation
+ * with a given name should be created. If the user does not have CREATE
+ * permission on the target namespace, this function will instead signal
+ * an ERROR.
+ *
+ * If non-NULL, *existing_oid is set to the OID of any existing relation with
+ * the same name which already exists in that namespace, or to InvalidOid if
+ * no such relation exists.
+ *
+ * If lockmode != NoLock, the specified lock mode is acquired on the existing
+ * relation, if any, provided that the current user owns the target relation.
+ * However, if lockmode != NoLock and the user does not own the target
+ * relation, we throw an ERROR, as we must not try to lock relations the
+ * user does not have permissions on.
+ *
+ * As a side effect, this function acquires AccessShareLock on the target
+ * namespace. Without this, the namespace could be dropped before our
+ * transaction commits, leaving behind relations with relnamespace pointing
+ * to a no-longer-exstant namespace.
+ *
+ * As a further side-effect, if the select namespace is a temporary namespace,
+ * we mark the RangeVar as RELPERSISTENCE_TEMP.
+ */
+Oid
+RangeVarGetAndCheckCreationNamespace(RangeVar *relation,
+ LOCKMODE lockmode,
+ Oid *existing_relation_id)
+{
+ uint64 inval_count;
+ Oid relid;
+ Oid oldrelid = InvalidOid;
+ Oid nspid;
+ Oid oldnspid = InvalidOid;
+ bool retry = false;
+
+ /*
+ * We check the catalog name and then ignore it.
+ */
+ if (relation->catalogname)
+ {
+ if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
+ relation->catalogname, relation->schemaname,
+ relation->relname)));
+ }
+
+ /*
+ * As in RangeVarGetRelidExtended(), we guard against concurrent DDL
+ * operations by tracking whether any invalidation messages are processed
+ * while we're doing the name lookups and acquiring locks. See comments
+ * in that function for a more detailed explanation of this logic.
+ */
+ for (;;)
+ {
+ AclResult aclresult;
+
+ inval_count = SharedInvalidMessageCounter;
+
+ /* Look up creation namespace and check for existing relation. */
+ nspid = RangeVarGetCreationNamespace(relation);
+ Assert(OidIsValid(nspid));
+ if (existing_relation_id != NULL)
+ relid = get_relname_relid(relation->relname, nspid);
+ else
+ relid = InvalidOid;
+
+ /*
+ * In bootstrap processing mode, we don't bother with permissions or
+ * locking. Permissions might not be working yet, and locking is
+ * unnecessary.
+ */
+ if (IsBootstrapProcessingMode())
+ break;
+
+ /* Check namespace permissions. */
+ aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ get_namespace_name(nspid));
+
+ if (retry)
+ {
+ /* If nothing changed, we're done. */
+ if (relid == oldrelid && nspid == oldnspid)
+ break;
+ /* If creation namespace has changed, give up old lock. */
+ if (nspid != oldnspid)
+ UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0,
+ AccessShareLock);
+ /* If name points to something different, give up old lock. */
+ if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock)
+ UnlockRelationOid(oldrelid, lockmode);
+ }
+
+ /* Lock namespace. */
+ if (nspid != oldnspid)
+ LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock);
+
+ /* Lock relation, if required if and we have permission. */
+ if (lockmode != NoLock && OidIsValid(relid))
+ {
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ relation->relname);
+ if (relid != oldrelid)
+ LockRelationOid(relid, lockmode);
+ }
+
+ /* If no invalidation message were processed, we're done! */
+ if (inval_count == SharedInvalidMessageCounter)
+ break;
+
+ /* Something may have changed, so recheck our work. */
+ retry = true;
+ oldrelid = relid;
+ oldnspid = nspid;
+ }
+
+ RangeVarAdjustRelationPersistence(relation, nspid);
+ if (existing_relation_id != NULL)
+ *existing_relation_id = relid;
+ return nspid;
+}
+
+/*
+ * Adjust the relpersistence for an about-to-be-created relation based on the
+ * creation namespace, and throw an error for invalid combinations.
+ */
+void
+RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid)
+{
+ switch (newRelation->relpersistence)
+ {
+ case RELPERSISTENCE_TEMP:
+ if (!isTempOrTempToastNamespace(nspid))
+ {
+ if (isAnyTempNamespace(nspid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot create relations in temporary schemas of other sessions")));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot create temporary relation in non-temporary schema")));
+ }
+ break;
+ case RELPERSISTENCE_PERMANENT:
+ if (isTempOrTempToastNamespace(nspid))
+ newRelation->relpersistence = RELPERSISTENCE_TEMP;
+ else if (isAnyTempNamespace(nspid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot create relations in temporary schemas of other sessions")));
+ break;
+ default:
+ if (isAnyTempNamespace(nspid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("only temporary relations may be created in temporary schemas")));
+ }
+}
+