*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.334 2010/07/25 23:21:21 rhaas Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.335 2010/07/28 05:22:24 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+#include "storage/lock.h"
#include "storage/smgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
Oid oldNspOid, Oid newNspOid);
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
- const char *newNspName);
+ const char *newNspName, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
Oid pkindOid, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid);
-static void ATController(Relation rel, List *cmds, bool recurse);
+static void ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
- bool recurse, bool recursing);
-static void ATRewriteCatalogs(List **wqueue);
+ bool recurse, bool recursing, LOCKMODE lockmode);
+static void ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode);
static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
- AlterTableCmd *cmd);
-static void ATRewriteTables(List **wqueue);
-static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
+ AlterTableCmd *cmd, LOCKMODE lockmode);
+static void ATRewriteTables(List **wqueue, LOCKMODE lockmode);
+static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
static void ATSimplePermissions(Relation rel, bool allowView);
static void ATSimplePermissionsRelationOrIndex(Relation rel);
static void ATSimpleRecursion(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse);
+ AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode);
static void ATOneLevelRecursion(List **wqueue, Relation rel,
- AlterTableCmd *cmd);
+ AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
- AlterTableCmd *cmd);
+ AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
- ColumnDef *colDef, bool isOid);
+ ColumnDef *colDef, bool isOid, LOCKMODE lockmode);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
- AlterTableCmd *cmd);
-static void ATExecDropNotNull(Relation rel, const char *colName);
+ AlterTableCmd *cmd, LOCKMODE lockmode);
+static void ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName);
+ const char *colName, LOCKMODE lockmode);
static void ATExecColumnDefault(Relation rel, const char *colName,
- Node *newDefault);
+ Node *newDefault, LOCKMODE lockmode);
static void ATPrepSetStatistics(Relation rel, const char *colName,
- Node *newValue);
+ Node *newValue, LOCKMODE lockmode);
static void ATExecSetStatistics(Relation rel, const char *colName,
- Node *newValue);
+ Node *newValue, LOCKMODE lockmode);
static void ATExecSetOptions(Relation rel, const char *colName,
- Node *options, bool isReset);
+ Node *options, bool isReset, LOCKMODE lockmode);
static void ATExecSetStorage(Relation rel, const char *colName,
- Node *newValue);
+ Node *newValue, LOCKMODE lockmode);
static void ATPrepDropColumn(Relation rel, bool recurse, AlterTableCmd *cmd);
static void ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
DropBehavior behavior,
bool recurse, bool recursing,
- bool missing_ok);
+ bool missing_ok, LOCKMODE lockmode);
static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
- IndexStmt *stmt, bool is_rebuild);
+ IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode);
static void ATExecAddConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel,
- Constraint *newConstraint, bool recurse);
+ Constraint *newConstraint, bool recurse, LOCKMODE lockmode);
static void ATAddCheckConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel,
Constraint *constr,
- bool recurse, bool recursing);
+ bool recurse, bool recursing, LOCKMODE lockmode);
static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
- Constraint *fkconstraint);
+ Constraint *fkconstraint, LOCKMODE lockmode);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
- bool missing_ok);
+ bool missing_ok, LOCKMODE lockmode);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
- AlterTableCmd *cmd);
+ AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
- const char *colName, TypeName *typeName);
-static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
-static void ATPostAlterTypeParse(char *cmd, List **wqueue);
+ const char *colName, TypeName *typeName, LOCKMODE lockmode);
+static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode);
+static void ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode);
static void change_owner_recurse_to_sequences(Oid relationOid,
- Oid newOwnerId);
-static void ATExecClusterOn(Relation rel, const char *indexName);
-static void ATExecDropCluster(Relation rel);
+ Oid newOwnerId, LOCKMODE lockmode);
+static void ATExecClusterOn(Relation rel, const char *indexName, LOCKMODE lockmode);
+static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
- char *tablespacename);
-static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
-static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset);
+ char *tablespacename, LOCKMODE lockmode);
+static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode);
+static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
- char fires_when, bool skip_system);
+ char fires_when, bool skip_system, LOCKMODE lockmode);
static void ATExecEnableDisableRule(Relation rel, char *rulename,
- char fires_when);
+ char fires_when, LOCKMODE lockmode);
static void ATPrepAddInherit(Relation child_rel);
-static void ATExecAddInherit(Relation child_rel, RangeVar *parent);
-static void ATExecDropInherit(Relation rel, RangeVar *parent);
+static void ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode);
+static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
ForkNumber forkNum, bool istemp);
static const char *storage_name(char c);
* Disallow ALTER TABLE (and similar commands) when the current backend has
* any open reference to the target table besides the one just acquired by
* the calling command; this implies there's an open cursor or active plan.
- * We need this check because our AccessExclusiveLock doesn't protect us
- * against stomping on our own foot, only other people's feet!
+ * We need this check because our lock doesn't protect us against stomping
+ * on our own foot, only other people's feet!
*
* For ALTER TABLE, the only case known to cause serious trouble is ALTER
* COLUMN TYPE, and some changes are obviously pretty benign, so this could
*
* Thanks to the magic of MVCC, an error anywhere along the way rolls back
* the whole operation; we don't have to do anything special to clean up.
+ *
+ * We lock the table as the first action, with an appropriate lock level
+ * for the subcommands requested. Any subcommand that needs to rewrite
+ * tuples in the table forces the whole command to be executed with
+ * AccessExclusiveLock. If all subcommands do not require rewrite table
+ * then we may be able to use lower lock levels. We pass the lock level down
+ * so that we can apply it recursively to inherited tables. Note that the
+ * lock level we want as we recurse may well be higher than required for
+ * that specific subcommand. So we pass down the overall lock requirement,
+ * rather than reassess it at lower levels.
*/
void
AlterTable(AlterTableStmt *stmt)
{
- Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+ Relation rel;
+ LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
+
+ /*
+ * Acquire same level of lock as already acquired during parsing.
+ */
+ rel = relation_openrv(stmt->relation, lockmode);
CheckTableNotInUse(rel, "ALTER TABLE");
elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
}
- ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
+ ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt),
+ lockmode);
}
/*
void
AlterTableInternal(Oid relid, List *cmds, bool recurse)
{
- Relation rel = relation_open(relid, AccessExclusiveLock);
+ Relation rel;
+ LOCKMODE lockmode = AlterTableGetLockLevel(cmds);
- ATController(rel, cmds, recurse);
+ rel = relation_open(relid, lockmode);
+
+ ATController(rel, cmds, recurse, lockmode);
+}
+
+/*
+ * AlterTableGetLockLevel
+ *
+ * Sets the overall lock level required for the supplied list of subcommands.
+ * Policy for doing this set according to needs of AlterTable(), see
+ * comments there for overall explanation.
+ *
+ * Function is called before and after parsing, so it must give same
+ * answer each time it is called. Some subcommands are transformed
+ * into other subcommand types, so the transform must never be made to a
+ * lower lock level than previously assigned. All transforms are noted below.
+ *
+ * Since this is called before we lock the table we cannot use table metadata
+ * to influence the type of lock we acquire.
+ *
+ * There should be no lockmodes hardcoded into the subcommand functions. All
+ * lockmode decisions for ALTER TABLE are made here only. The one exception is
+ * ALTER TABLE RENAME which is treated as a different statement type T_RenameStmt
+ * and does not travel through this section of code and cannot be combined with
+ * any of the subcommands given here.
+ */
+LOCKMODE
+AlterTableGetLockLevel(List *cmds)
+{
+ ListCell *lcmd;
+ LOCKMODE lockmode = ShareUpdateExclusiveLock;
+
+ foreach(lcmd, cmds)
+ {
+ AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
+ LOCKMODE cmd_lockmode = AccessExclusiveLock; /* default for compiler */
+
+ switch (cmd->subtype)
+ {
+ /*
+ * Need AccessExclusiveLock for these subcommands because they
+ * affect or potentially affect both read and write operations.
+ *
+ * New subcommand types should be added here by default.
+ */
+ case AT_AddColumn: /* may rewrite heap, in some cases and visible to SELECT */
+ case AT_DropColumn: /* change visible to SELECT */
+ case AT_AddColumnToView: /* CREATE VIEW */
+ case AT_AlterColumnType: /* must rewrite heap */
+ case AT_DropConstraint: /* as DROP INDEX */
+ case AT_AddOids: /* must rewrite heap */
+ case AT_DropOids: /* calls AT_DropColumn */
+ case AT_EnableAlwaysRule: /* may change SELECT rules */
+ case AT_EnableReplicaRule: /* may change SELECT rules */
+ case AT_EnableRule: /* may change SELECT rules */
+ case AT_DisableRule: /* may change SELECT rules */
+ case AT_ChangeOwner: /* change visible to SELECT */
+ case AT_SetTableSpace: /* must rewrite heap */
+ case AT_DropNotNull: /* may change some SQL plans */
+ case AT_SetNotNull:
+ cmd_lockmode = AccessExclusiveLock;
+ break;
+
+ /*
+ * These subcommands affect write operations only.
+ */
+ case AT_ColumnDefault:
+ case AT_ProcessedConstraint: /* becomes AT_AddConstraint */
+ case AT_AddConstraintRecurse: /* becomes AT_AddConstraint */
+ case AT_EnableTrig:
+ case AT_EnableAlwaysTrig:
+ case AT_EnableReplicaTrig:
+ case AT_EnableTrigAll:
+ case AT_EnableTrigUser:
+ case AT_DisableTrig:
+ case AT_DisableTrigAll:
+ case AT_DisableTrigUser:
+ case AT_AddIndex: /* from ADD CONSTRAINT */
+ cmd_lockmode = ShareRowExclusiveLock;
+ break;
+
+ case AT_AddConstraint:
+ if (IsA(cmd->def, Constraint))
+ {
+ Constraint *con = (Constraint *) cmd->def;
+
+ switch (con->contype)
+ {
+ case CONSTR_EXCLUSION:
+ case CONSTR_PRIMARY:
+ case CONSTR_UNIQUE:
+ /*
+ * Cases essentially the same as CREATE INDEX. We
+ * could reduce the lock strength to ShareLock if we
+ * can work out how to allow concurrent catalog updates.
+ */
+ cmd_lockmode = ShareRowExclusiveLock;
+ break;
+ case CONSTR_FOREIGN:
+ /*
+ * We add triggers to both tables when we add a
+ * Foreign Key, so the lock level must be at least
+ * as strong as CREATE TRIGGER.
+ */
+ cmd_lockmode = ShareRowExclusiveLock;
+ break;
+
+ default:
+ cmd_lockmode = ShareRowExclusiveLock;
+ }
+ }
+ break;
+
+ /*
+ * These subcommands affect inheritance behaviour. Queries started before us
+ * will continue to see the old inheritance behaviour, while queries started
+ * after we commit will see new behaviour. No need to prevent reads or writes
+ * to the subtable while we hook it up though. In both cases the parent table
+ * is locked with AccessShareLock.
+ */
+ case AT_AddInherit:
+ case AT_DropInherit:
+ cmd_lockmode = ShareUpdateExclusiveLock;
+ break;
+
+ /*
+ * These subcommands affect general strategies for performance and maintenance,
+ * though don't change the semantic results from normal data reads and writes.
+ * Delaying an ALTER TABLE behind currently active writes only delays the point
+ * where the new strategy begins to take effect, so there is no benefit in waiting.
+ * In thise case the minimum restriction applies: we don't currently allow
+ * concurrent catalog updates.
+ */
+ case AT_SetStatistics:
+ case AT_ClusterOn:
+ case AT_DropCluster:
+ case AT_SetRelOptions:
+ case AT_ResetRelOptions:
+ case AT_SetStorage:
+ cmd_lockmode = ShareUpdateExclusiveLock;
+ break;
+
+ default: /* oops */
+ elog(ERROR, "unrecognized alter table type: %d",
+ (int) cmd->subtype);
+ break;
+ }
+
+ /*
+ * Take the greatest lockmode from any subcommand
+ */
+ if (cmd_lockmode > lockmode)
+ lockmode = cmd_lockmode;
+ }
+
+ return lockmode;
}
static void
-ATController(Relation rel, List *cmds, bool recurse)
+ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode)
{
List *wqueue = NIL;
ListCell *lcmd;
{
AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
- ATPrepCmd(&wqueue, rel, cmd, recurse, false);
+ ATPrepCmd(&wqueue, rel, cmd, recurse, false, lockmode);
}
/* Close the relation, but keep lock until commit */
relation_close(rel, NoLock);
/* Phase 2: update system catalogs */
- ATRewriteCatalogs(&wqueue);
+ ATRewriteCatalogs(&wqueue, lockmode);
/* Phase 3: scan/rewrite tables as needed */
- ATRewriteTables(&wqueue);
+ ATRewriteTables(&wqueue, lockmode);
}
/*
* Traffic cop for ALTER TABLE Phase 1 operations, including simple
* recursion and permission checks.
*
- * Caller must have acquired AccessExclusiveLock on relation already.
+ * Caller must have acquired appropriate lock type on relation already.
* This lock should be held until commit.
*/
static void
ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
- bool recurse, bool recursing)
+ bool recurse, bool recursing, LOCKMODE lockmode)
{
AlteredTableInfo *tab;
int pass;
case AT_AddColumn: /* ADD COLUMN */
ATSimplePermissions(rel, false);
/* Performs own recursion */
- ATPrepAddColumn(wqueue, rel, recurse, cmd);
+ ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
break;
case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */
ATSimplePermissions(rel, true);
/* Performs own recursion */
- ATPrepAddColumn(wqueue, rel, recurse, cmd);
+ ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
* rules.
*/
ATSimplePermissions(rel, true);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(rel, false);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_DROP;
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(rel, false);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
break;
case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* Performs own permission checks */
- ATPrepSetStatistics(rel, cmd->name, cmd->def);
+ ATPrepSetStatistics(rel, cmd->name, cmd->def, lockmode);
pass = AT_PASS_COL_ATTRS;
break;
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, false);
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_COL_ATTRS;
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATSimplePermissions(rel, false);
/* Performs own recursion */
- ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
+ ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
pass = AT_PASS_ALTER_TYPE;
break;
case AT_ChangeOwner: /* ALTER OWNER */
ATSimplePermissions(rel, false);
/* Performs own recursion */
if (!rel->rd_rel->relhasoids || recursing)
- ATPrepAddOids(wqueue, rel, recurse, cmd);
+ ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL;
break;
case AT_DropOids: /* SET WITHOUT OIDS */
dropCmd->subtype = AT_DropColumn;
dropCmd->name = pstrdup("oid");
dropCmd->behavior = cmd->behavior;
- ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
+ ATPrepCmd(wqueue, rel, dropCmd, recurse, false, lockmode);
}
pass = AT_PASS_DROP;
break;
case AT_SetTableSpace: /* SET TABLESPACE */
ATSimplePermissionsRelationOrIndex(rel);
/* This command never recurses */
- ATPrepSetTableSpace(tab, rel, cmd->name);
+ ATPrepSetTableSpace(tab, rel, cmd->name, lockmode);
pass = AT_PASS_MISC; /* doesn't actually matter */
break;
case AT_SetRelOptions: /* SET (...) */
* conflicts).
*/
static void
-ATRewriteCatalogs(List **wqueue)
+ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode)
{
int pass;
ListCell *ltab;
continue;
/*
- * Exclusive lock was obtained by phase 1, needn't get it again
+ * Appropriate lock was obtained by phase 1, needn't get it again
*/
rel = relation_open(tab->relid, NoLock);
foreach(lcmd, subcmds)
- ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
+ ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd), lockmode);
/*
* After the ALTER TYPE pass, do cleanup work (this is not done in
* multiple columns of a table are altered).
*/
if (pass == AT_PASS_ALTER_TYPE)
- ATPostAlterTypeCleanup(wqueue, tab);
+ ATPostAlterTypeCleanup(wqueue, tab, lockmode);
relation_close(rel, NoLock);
}
*/
static void
ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
- AlterTableCmd *cmd)
+ AlterTableCmd *cmd, LOCKMODE lockmode)
{
switch (cmd->subtype)
{
case AT_AddColumn: /* ADD COLUMN */
case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */
- ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false);
+ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false, lockmode);
break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
- ATExecColumnDefault(rel, cmd->name, cmd->def);
+ ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
- ATExecDropNotNull(rel, cmd->name);
+ ATExecDropNotNull(rel, cmd->name, lockmode);
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
- ATExecSetNotNull(tab, rel, cmd->name);
+ ATExecSetNotNull(tab, rel, cmd->name, lockmode);
break;
case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */
- ATExecSetStatistics(rel, cmd->name, cmd->def);
+ ATExecSetStatistics(rel, cmd->name, cmd->def, lockmode);
break;
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
- ATExecSetOptions(rel, cmd->name, cmd->def, false);
+ ATExecSetOptions(rel, cmd->name, cmd->def, false, lockmode);
break;
case AT_ResetOptions: /* ALTER COLUMN RESET ( options ) */
- ATExecSetOptions(rel, cmd->name, cmd->def, true);
+ ATExecSetOptions(rel, cmd->name, cmd->def, true, lockmode);
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
- ATExecSetStorage(rel, cmd->name, cmd->def);
+ ATExecSetStorage(rel, cmd->name, cmd->def, lockmode);
break;
case AT_DropColumn: /* DROP COLUMN */
ATExecDropColumn(wqueue, rel, cmd->name,
- cmd->behavior, false, false, cmd->missing_ok);
+ cmd->behavior, false, false, cmd->missing_ok, lockmode);
break;
case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
ATExecDropColumn(wqueue, rel, cmd->name,
- cmd->behavior, true, false, cmd->missing_ok);
+ cmd->behavior, true, false, cmd->missing_ok, lockmode);
break;
case AT_AddIndex: /* ADD INDEX */
- ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
+ ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false, lockmode);
break;
case AT_ReAddIndex: /* ADD INDEX */
- ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
+ ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true, lockmode);
break;
case AT_AddConstraint: /* ADD CONSTRAINT */
ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def,
- false);
+ false, lockmode);
break;
case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */
ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def,
- true);
+ true, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
false, false,
- cmd->missing_ok);
+ cmd->missing_ok, lockmode);
break;
case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
true, false,
- cmd->missing_ok);
+ cmd->missing_ok, lockmode);
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
- ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
+ ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def, lockmode);
break;
case AT_ChangeOwner: /* ALTER OWNER */
ATExecChangeOwner(RelationGetRelid(rel),
get_roleid_checked(cmd->name),
- false);
+ false, lockmode);
break;
case AT_ClusterOn: /* CLUSTER ON */
- ATExecClusterOn(rel, cmd->name);
+ ATExecClusterOn(rel, cmd->name, lockmode);
break;
case AT_DropCluster: /* SET WITHOUT CLUSTER */
- ATExecDropCluster(rel);
+ ATExecDropCluster(rel, lockmode);
break;
case AT_AddOids: /* SET WITH OIDS */
/* Use the ADD COLUMN code, unless prep decided to do nothing */
if (cmd->def != NULL)
- ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true);
+ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true, lockmode);
break;
case AT_DropOids: /* SET WITHOUT OIDS */
*/
break;
case AT_SetRelOptions: /* SET (...) */
- ATExecSetRelOptions(rel, (List *) cmd->def, false);
+ ATExecSetRelOptions(rel, (List *) cmd->def, false, lockmode);
break;
case AT_ResetRelOptions: /* RESET (...) */
- ATExecSetRelOptions(rel, (List *) cmd->def, true);
+ ATExecSetRelOptions(rel, (List *) cmd->def, true, lockmode);
break;
case AT_EnableTrig: /* ENABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name,
- TRIGGER_FIRES_ON_ORIGIN, false);
+ TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
break;
case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name,
- TRIGGER_FIRES_ALWAYS, false);
+ TRIGGER_FIRES_ALWAYS, false, lockmode);
break;
case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name,
- TRIGGER_FIRES_ON_REPLICA, false);
+ TRIGGER_FIRES_ON_REPLICA, false, lockmode);
break;
case AT_DisableTrig: /* DISABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name,
- TRIGGER_DISABLED, false);
+ TRIGGER_DISABLED, false, lockmode);
break;
case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL,
- TRIGGER_FIRES_ON_ORIGIN, false);
+ TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
break;
case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL,
- TRIGGER_DISABLED, false);
+ TRIGGER_DISABLED, false, lockmode);
break;
case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL,
- TRIGGER_FIRES_ON_ORIGIN, true);
+ TRIGGER_FIRES_ON_ORIGIN, true, lockmode);
break;
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL,
- TRIGGER_DISABLED, true);
+ TRIGGER_DISABLED, true, lockmode);
break;
case AT_EnableRule: /* ENABLE RULE name */
ATExecEnableDisableRule(rel, cmd->name,
- RULE_FIRES_ON_ORIGIN);
+ RULE_FIRES_ON_ORIGIN, lockmode);
break;
case AT_EnableAlwaysRule: /* ENABLE ALWAYS RULE name */
ATExecEnableDisableRule(rel, cmd->name,
- RULE_FIRES_ALWAYS);
+ RULE_FIRES_ALWAYS, lockmode);
break;
case AT_EnableReplicaRule: /* ENABLE REPLICA RULE name */
ATExecEnableDisableRule(rel, cmd->name,
- RULE_FIRES_ON_REPLICA);
+ RULE_FIRES_ON_REPLICA, lockmode);
break;
case AT_DisableRule: /* DISABLE RULE name */
ATExecEnableDisableRule(rel, cmd->name,
- RULE_DISABLED);
+ RULE_DISABLED, lockmode);
break;
case AT_AddInherit:
- ATExecAddInherit(rel, (RangeVar *) cmd->def);
+ ATExecAddInherit(rel, (RangeVar *) cmd->def, lockmode);
break;
case AT_DropInherit:
- ATExecDropInherit(rel, (RangeVar *) cmd->def);
+ ATExecDropInherit(rel, (RangeVar *) cmd->def, lockmode);
break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
* ATRewriteTables: ALTER TABLE phase 3
*/
static void
-ATRewriteTables(List **wqueue)
+ATRewriteTables(List **wqueue, LOCKMODE lockmode)
{
ListCell *ltab;
* modifications, and test the current data within the table
* against new constraints generated by ALTER TABLE commands.
*/
- ATRewriteTable(tab, OIDNewHeap);
+ ATRewriteTable(tab, OIDNewHeap, lockmode);
/*
* Swap the physical files of the old and new heaps, then rebuild
* generated by ALTER TABLE commands, but don't rebuild data.
*/
if (tab->constraints != NIL || tab->new_notnull)
- ATRewriteTable(tab, InvalidOid);
+ ATRewriteTable(tab, InvalidOid, lockmode);
/*
* If we had SET TABLESPACE but no reason to reconstruct tuples,
* just do a block-by-block copy.
*/
if (tab->newTableSpace)
- ATExecSetTableSpace(tab->relid, tab->newTableSpace);
+ ATExecSetTableSpace(tab->relid, tab->newTableSpace, lockmode);
}
}
Relation refrel;
if (rel == NULL)
- {
/* Long since locked, no need for another */
rel = heap_open(tab->relid, NoLock);
- }
- refrel = heap_open(con->refrelid, RowShareLock);
+ /*
+ * We're adding a trigger to both tables, so the lock level
+ * here should sensibly reflect that.
+ */
+ refrel = heap_open(con->refrelid, ShareRowExclusiveLock);
validateForeignKeyConstraint(fkconstraint, rel, refrel,
con->refindid,
* OIDNewHeap is InvalidOid if we don't need to rewrite
*/
static void
-ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
+ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
{
Relation oldrel;
Relation newrel;
newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
if (OidIsValid(OIDNewHeap))
- newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
+ newrel = heap_open(OIDNewHeap, lockmode);
else
newrel = NULL;
*/
static void
ATSimpleRecursion(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse)
+ AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode)
{
/*
* Propagate to children if desired. Non-table relations never have
ListCell *child;
List *children;
- children = find_all_inheritors(relid, AccessExclusiveLock, NULL);
+ children = find_all_inheritors(relid, lockmode, NULL);
/*
* find_all_inheritors does the recursive search of the inheritance
/* find_all_inheritors already got lock */
childrel = relation_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
- ATPrepCmd(wqueue, childrel, cmd, false, true);
+ ATPrepCmd(wqueue, childrel, cmd, false, true, lockmode);
relation_close(childrel, NoLock);
}
}
*/
static void
ATOneLevelRecursion(List **wqueue, Relation rel,
- AlterTableCmd *cmd)
+ AlterTableCmd *cmd, LOCKMODE lockmode)
{
Oid relid = RelationGetRelid(rel);
ListCell *child;
List *children;
- children = find_inheritance_children(relid, AccessExclusiveLock);
+ children = find_inheritance_children(relid, lockmode);
foreach(child, children)
{
/* find_inheritance_children already got lock */
childrel = relation_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
- ATPrepCmd(wqueue, childrel, cmd, true, true);
+ ATPrepCmd(wqueue, childrel, cmd, true, true, lockmode);
relation_close(childrel, NoLock);
}
}
*/
static void
ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
- AlterTableCmd *cmd)
+ AlterTableCmd *cmd, LOCKMODE lockmode)
{
if (rel->rd_rel->reloftype)
ereport(ERROR,
colDefChild->inhcount = 1;
colDefChild->is_local = false;
- ATOneLevelRecursion(wqueue, rel, childCmd);
+ ATOneLevelRecursion(wqueue, rel, childCmd, lockmode);
}
else
{
static void
ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
- ColumnDef *colDef, bool isOid)
+ ColumnDef *colDef, bool isOid, LOCKMODE lockmode)
{
Oid myrelid = RelationGetRelid(rel);
Relation pgclass,
* to cons up a ColumnDef node because the ADD COLUMN code needs one.
*/
static void
-ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd)
+ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOCKMODE lockmode)
{
/* If we're recursing to a child table, the ColumnDef is already set up */
if (cmd->def == NULL)
cdef->storage = 0;
cmd->def = (Node *) cdef;
}
- ATPrepAddColumn(wqueue, rel, recurse, cmd);
+ ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
}
/*
* ALTER TABLE ALTER COLUMN DROP NOT NULL
*/
static void
-ATExecDropNotNull(Relation rel, const char *colName)
+ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
{
HeapTuple tuple;
AttrNumber attnum;
*/
static void
ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName)
+ const char *colName, LOCKMODE lockmode)
{
HeapTuple tuple;
AttrNumber attnum;
*/
static void
ATExecColumnDefault(Relation rel, const char *colName,
- Node *newDefault)
+ Node *newDefault, LOCKMODE lockmode)
{
AttrNumber attnum;
* ALTER TABLE ALTER COLUMN SET STATISTICS
*/
static void
-ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
+ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{
/*
* We do our own permission checking because (a) we want to allow SET
}
static void
-ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
+ATExecSetStatistics(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{
int newtarget;
Relation attrelation;
static void
ATExecSetOptions(Relation rel, const char *colName, Node *options,
- bool isReset)
+ bool isReset, LOCKMODE lockmode)
{
Relation attrelation;
HeapTuple tuple,
* ALTER TABLE ALTER COLUMN SET STORAGE
*/
static void
-ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
+ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{
char *storagemode;
char newstorage;
ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
DropBehavior behavior,
bool recurse, bool recursing,
- bool missing_ok)
+ bool missing_ok, LOCKMODE lockmode)
{
HeapTuple tuple;
Form_pg_attribute targetatt;
* routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass.
*/
- children = find_inheritance_children(RelationGetRelid(rel),
- AccessExclusiveLock);
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
if (children)
{
/* Time to delete this child column, too */
ATExecDropColumn(wqueue, childrel, colName,
behavior, true, true,
- false);
+ false, lockmode);
}
else
{
*/
static void
ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
- IndexStmt *stmt, bool is_rebuild)
+ IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode)
{
bool check_rights;
bool skip_build;
*/
static void
ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
- Constraint *newConstraint, bool recurse)
+ Constraint *newConstraint, bool recurse, LOCKMODE lockmode)
{
Assert(IsA(newConstraint, Constraint));
{
case CONSTR_CHECK:
ATAddCheckConstraint(wqueue, tab, rel,
- newConstraint, recurse, false);
+ newConstraint, recurse, false, lockmode);
break;
case CONSTR_FOREIGN:
RelationGetNamespace(rel),
NIL);
- ATAddForeignKeyConstraint(tab, rel, newConstraint);
+ ATAddForeignKeyConstraint(tab, rel, newConstraint, lockmode);
break;
default:
*/
static void
ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
- Constraint *constr, bool recurse, bool recursing)
+ Constraint *constr, bool recurse, bool recursing,
+ LOCKMODE lockmode)
{
List *newcons;
ListCell *lcon;
* routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass.
*/
- children = find_inheritance_children(RelationGetRelid(rel),
- AccessExclusiveLock);
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
/*
* If we are told not to recurse, there had better not be any child
/* Recurse to child */
ATAddCheckConstraint(wqueue, childtab, childrel,
- constr, recurse, true);
+ constr, recurse, true, lockmode);
heap_close(childrel, NoLock);
}
*/
static void
ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
- Constraint *fkconstraint)
+ Constraint *fkconstraint, LOCKMODE lockmode)
{
Relation pkrel;
int16 pkattnum[INDEX_MAX_KEYS];
Oid indexOid;
Oid constrOid;
- /*
- * Grab an exclusive lock on the pk table, so that someone doesn't delete
- * rows out from under us. (Although a lesser lock would do for that
- * purpose, we'll need exclusive lock anyway to add triggers to the pk
- * table; trying to start with a lesser lock will just create a risk of
- * deadlock.)
- */
- pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
+ pkrel = heap_openrv(fkconstraint->pktable, lockmode);
/*
* Validity checks (permission checks wait till we have the column
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint.
*
- * Caller must have opened and locked both relations.
+ * Caller must have opened and locked both relations appropriately.
*/
static void
validateForeignKeyConstraint(Constraint *fkconstraint,
ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
- bool missing_ok)
+ bool missing_ok, LOCKMODE lockmode)
{
List *children;
ListCell *child;
* use find_all_inheritors to do it in one pass.
*/
if (is_check_constraint)
- children = find_inheritance_children(RelationGetRelid(rel),
- AccessExclusiveLock);
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
else
children = NIL;
/* Time to delete this child constraint, too */
ATExecDropConstraint(childrel, constrName, behavior,
true, true,
- false);
+ false, lockmode);
}
else
{
ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
- AlterTableCmd *cmd)
+ AlterTableCmd *cmd, LOCKMODE lockmode)
{
char *colName = cmd->name;
TypeName *typeName = (TypeName *) cmd->def;
* alter would put them out of step.
*/
if (recurse)
- ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
else if (!recursing &&
find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
ereport(ERROR,
static void
ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
- const char *colName, TypeName *typeName)
+ const char *colName, TypeName *typeName, LOCKMODE lockmode)
{
HeapTuple heapTup;
Form_pg_attribute attTup;
* and constraints that depend on the altered columns.
*/
static void
-ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
+ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
{
ObjectAddress obj;
ListCell *l;
* at the catalogs to detect the existing entry.
*/
foreach(l, tab->changedIndexDefs)
- ATPostAlterTypeParse((char *) lfirst(l), wqueue);
+ ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
foreach(l, tab->changedConstraintDefs)
- ATPostAlterTypeParse((char *) lfirst(l), wqueue);
+ ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
/*
* Now we can drop the existing constraints and indexes --- constraints
}
static void
-ATPostAlterTypeParse(char *cmd, List **wqueue)
+ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
{
List *raw_parsetree_list;
List *querytree_list;
IndexStmt *stmt = (IndexStmt *) stm;
AlterTableCmd *newcmd;
- rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+ rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel);
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_ReAddIndex;
AlterTableStmt *stmt = (AlterTableStmt *) stm;
ListCell *lcmd;
- rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+ rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel);
foreach(lcmd, stmt->cmds)
{
* free-standing composite type.
*/
void
-ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
+ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lockmode)
{
Relation target_rel;
Relation class_rel;
* Get exclusive lock till end of transaction on the target table. Use
* relation_open so that we can work on indexes and sequences.
*/
- target_rel = relation_open(relationOid, AccessExclusiveLock);
+ target_rel = relation_open(relationOid, lockmode);
/* Get its pg_class tuple, too */
class_rel = heap_open(RelationRelationId, RowExclusiveLock);
/* For each index, recursively change its ownership */
foreach(i, index_oid_list)
- ATExecChangeOwner(lfirst_oid(i), newOwnerId, true);
+ ATExecChangeOwner(lfirst_oid(i), newOwnerId, true, lockmode);
list_free(index_oid_list);
}
/* If it has a toast table, recurse to change its ownership */
if (tuple_class->reltoastrelid != InvalidOid)
ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
- true);
+ true, lockmode);
/* If it has dependent sequences, recurse to change them too */
- change_owner_recurse_to_sequences(relationOid, newOwnerId);
+ change_owner_recurse_to_sequences(relationOid, newOwnerId, lockmode);
}
}
* ownership.
*/
static void
-change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
+change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId, LOCKMODE lockmode)
{
Relation depRel;
SysScanDesc scan;
continue;
/* Use relation_open just in case it's an index */
- seqRel = relation_open(depForm->objid, AccessExclusiveLock);
+ seqRel = relation_open(depForm->objid, lockmode);
/* skip non-sequence relations */
if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
{
/* No need to keep the lock */
- relation_close(seqRel, AccessExclusiveLock);
+ relation_close(seqRel, lockmode);
continue;
}
/* We don't need to close the sequence while we alter it. */
- ATExecChangeOwner(depForm->objid, newOwnerId, true);
+ ATExecChangeOwner(depForm->objid, newOwnerId, true, lockmode);
/* Now we can close it. Keep the lock till end of transaction. */
relation_close(seqRel, NoLock);
* The only thing we have to do is to change the indisclustered bits.
*/
static void
-ATExecClusterOn(Relation rel, const char *indexName)
+ATExecClusterOn(Relation rel, const char *indexName, LOCKMODE lockmode)
{
Oid indexOid;
* set and turn it off.
*/
static void
-ATExecDropCluster(Relation rel)
+ATExecDropCluster(Relation rel, LOCKMODE lockmode)
{
mark_index_clustered(rel, InvalidOid);
}
* ALTER TABLE SET TABLESPACE
*/
static void
-ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
+ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename, LOCKMODE lockmode)
{
Oid tablespaceId;
AclResult aclresult;
* ALTER TABLE/INDEX SET (...) or RESET (...)
*/
static void
-ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
+ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode)
{
Oid relid;
Relation pgclass;
Relation toastrel;
Oid toastid = rel->rd_rel->reltoastrelid;
- toastrel = heap_open(toastid, AccessExclusiveLock);
+ toastrel = heap_open(toastid, lockmode);
/* Get the old reloptions */
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
* rewriting to be done, so we just want to copy the data as fast as possible.
*/
static void
-ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
+ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode)
{
Relation rel;
Oid oldTableSpace;
/*
* Need lock here in case we are recursing to toast table or index
*/
- rel = relation_open(tableOid, AccessExclusiveLock);
+ rel = relation_open(tableOid, lockmode);
/*
* No work if no change in tablespace.
/* Move associated toast relation and/or index, too */
if (OidIsValid(reltoastrelid))
- ATExecSetTableSpace(reltoastrelid, newTableSpace);
+ ATExecSetTableSpace(reltoastrelid, newTableSpace, lockmode);
if (OidIsValid(reltoastidxid))
- ATExecSetTableSpace(reltoastidxid, newTableSpace);
+ ATExecSetTableSpace(reltoastidxid, newTableSpace, lockmode);
}
/*
*/
static void
ATExecEnableDisableTrigger(Relation rel, char *trigname,
- char fires_when, bool skip_system)
+ char fires_when, bool skip_system, LOCKMODE lockmode)
{
EnableDisableTrigger(rel, trigname, fires_when, skip_system);
}
*/
static void
ATExecEnableDisableRule(Relation rel, char *trigname,
- char fires_when)
+ char fires_when, LOCKMODE lockmode)
{
EnableDisableRule(rel, trigname, fires_when);
}
}
static void
-ATExecAddInherit(Relation child_rel, RangeVar *parent)
+ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
{
Relation parent_rel,
catalogRelation;
* exactly the same way.
*/
static void
-ATExecDropInherit(Relation rel, RangeVar *parent)
+ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
{
Relation parent_rel;
Relation catalogRelation;
*/
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
- ObjectType stmttype)
+ ObjectType stmttype, LOCKMODE lockmode)
{
Relation rel;
Oid relid;
Oid nspOid;
Relation classRel;
- rel = relation_openrv(relation, AccessExclusiveLock);
+ rel = relation_openrv(relation, lockmode);
relid = RelationGetRelid(rel);
oldNspOid = RelationGetNamespace(rel);
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
- AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema);
+ AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema, lockmode);
AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
}
*/
static void
AlterSeqNamespaces(Relation classRel, Relation rel,
- Oid oldNspOid, Oid newNspOid, const char *newNspName)
+ Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode)
{
Relation depRel;
SysScanDesc scan;
continue;
/* Use relation_open just in case it's an index */
- seqRel = relation_open(depForm->objid, AccessExclusiveLock);
+ seqRel = relation_open(depForm->objid, lockmode);
/* skip non-sequence relations */
if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
{
/* No need to keep the lock */
- relation_close(seqRel, AccessExclusiveLock);
+ relation_close(seqRel, lockmode);
continue;
}