]> granicus.if.org Git - postgresql/commitdiff
Make identity sequence management more robust
authorPeter Eisentraut <peter@eisentraut.org>
Mon, 22 Jul 2019 10:05:03 +0000 (12:05 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Mon, 22 Jul 2019 10:07:10 +0000 (12:07 +0200)
Some code could get confused when certain catalog state involving both
identity and serial sequences was present, perhaps during an attempt
to upgrade the latter to the former.  Specifically, dropping the
default of a serial column maintains the ownership of the sequence by
the column, and so it would then be possible to afterwards make the
column an identity column that would now own two sequences.  This
causes the code that looks up the identity sequence to error out,
making the new identity column inoperable until the ownership of the
previous sequence is released.

To fix this, make the identity sequence lookup only consider sequences
with the appropriate dependency type for an identity sequence, so it
only ever finds one (unless something else is broken).  In the above
example, the old serial sequence would then be ignored.  Reorganize
the various owned-sequence-lookup functions a bit to make this
clearer.

Reported-by: Laurenz Albe <laurenz.albe@cybertec.at>
Discussion: https://www.postgresql.org/message-id/flat/470c54fc8590be4de0f41b0d295fd6390d5e8a6c.camel@cybertec.at

src/backend/catalog/pg_depend.c
src/backend/commands/tablecmds.c
src/backend/parser/parse_utilcmd.c
src/backend/rewrite/rewriteHandler.c
src/include/catalog/dependency.h
src/test/regress/expected/identity.out
src/test/regress/sql/identity.sql

index 4116e93b64ba2ecdc8204ae0e61b80d8a5469937..a060c25d2eeafe22cfdb01ab5cf5406e50fb56b3 100644 (file)
@@ -705,10 +705,11 @@ sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId)
 
 /*
  * Collect a list of OIDs of all sequences owned by the specified relation,
- * and column if specified.
+ * and column if specified.  If deptype is not zero, then only find sequences
+ * with the specified dependency type.
  */
-List *
-getOwnedSequences(Oid relid, AttrNumber attnum)
+static List *
+getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype)
 {
        List       *result = NIL;
        Relation        depRel;
@@ -750,7 +751,8 @@ getOwnedSequences(Oid relid, AttrNumber attnum)
                        (deprec->deptype == DEPENDENCY_AUTO || deprec->deptype == DEPENDENCY_INTERNAL) &&
                        get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
                {
-                       result = lappend_oid(result, deprec->objid);
+                       if (!deptype || deprec->deptype == deptype)
+                               result = lappend_oid(result, deprec->objid);
                }
        }
 
@@ -762,17 +764,32 @@ getOwnedSequences(Oid relid, AttrNumber attnum)
 }
 
 /*
- * Get owned sequence, error if not exactly one.
+ * Collect a list of OIDs of all sequences owned (identity or serial) by the
+ * specified relation.
+ */
+List *
+getOwnedSequences(Oid relid)
+{
+       return getOwnedSequences_internal(relid, 0, 0);
+}
+
+/*
+ * Get owned identity sequence, error if not exactly one.
  */
 Oid
-getOwnedSequence(Oid relid, AttrNumber attnum)
+getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok)
 {
-       List       *seqlist = getOwnedSequences(relid, attnum);
+       List       *seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
 
        if (list_length(seqlist) > 1)
                elog(ERROR, "more than one owned sequence found");
        else if (list_length(seqlist) < 1)
-               elog(ERROR, "no owned sequence found");
+       {
+               if (missing_ok)
+                       return InvalidOid;
+               else
+                       elog(ERROR, "no owned sequence found");
+       }
 
        return linitial_oid(seqlist);
 }
index fc1c4dfa4caebccfb1f8962a8ea45919c9a29bc7..f39f993e01e4b99a376ae66d694d5f8039b01e6c 100644 (file)
@@ -1689,7 +1689,7 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
                foreach(cell, rels)
                {
                        Relation        rel = (Relation) lfirst(cell);
-                       List       *seqlist = getOwnedSequences(RelationGetRelid(rel), 0);
+                       List       *seqlist = getOwnedSequences(RelationGetRelid(rel));
                        ListCell   *seqcell;
 
                        foreach(seqcell, seqlist)
@@ -6635,7 +6635,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE
        table_close(attrelation, RowExclusiveLock);
 
        /* drop the internal sequence */
-       seqid = getOwnedSequence(RelationGetRelid(rel), attnum);
+       seqid = getIdentitySequence(RelationGetRelid(rel), attnum, false);
        deleteDependencyRecordsForClass(RelationRelationId, seqid,
                                                                        RelationRelationId, DEPENDENCY_INTERNAL);
        CommandCounterIncrement();
index f0f6ee7db6ba0691d66d9bc0e888dcf9b58a2caa..a9b2f8bacd807d49241b2bd451a1bc903da7f1e6 100644 (file)
@@ -1083,7 +1083,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                         * find sequence owned by old column; extract sequence parameters;
                         * build new create sequence command
                         */
-                       seq_relid = getOwnedSequence(RelationGetRelid(relation), attribute->attnum);
+                       seq_relid = getIdentitySequence(RelationGetRelid(relation), attribute->attnum, false);
                        seq_options = sequence_options(seq_relid);
                        generateSerialExtraStmts(cxt, def,
                                                                         InvalidOid, seq_options, true,
@@ -3165,7 +3165,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
                                        if (attnum != InvalidAttrNumber &&
                                                TupleDescAttr(tupdesc, attnum - 1)->attidentity)
                                        {
-                                               Oid                     seq_relid = getOwnedSequence(relid, attnum);
+                                               Oid                     seq_relid = getIdentitySequence(relid, attnum, false);
                                                Oid                     typeOid = typenameTypeId(pstate, def->typeName);
                                                AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
 
@@ -3216,7 +3216,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
                                        ListCell   *lc;
                                        List       *newseqopts = NIL;
                                        List       *newdef = NIL;
-                                       List       *seqlist;
                                        AttrNumber      attnum;
 
                                        /*
@@ -3237,14 +3236,13 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
 
                                        if (attnum)
                                        {
-                                               seqlist = getOwnedSequences(relid, attnum);
-                                               if (seqlist)
+                                               Oid                     seq_relid = getIdentitySequence(relid, attnum, true);
+
+                                               if (seq_relid)
                                                {
                                                        AlterSeqStmt *seqstmt;
-                                                       Oid                     seq_relid;
 
                                                        seqstmt = makeNode(AlterSeqStmt);
-                                                       seq_relid = linitial_oid(seqlist);
                                                        seqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
                                                                                                                         get_rel_name(seq_relid), -1);
                                                        seqstmt->options = newseqopts;
index 93b678402328278c9a57aa4d9c872d429086eb56..c96ef8595affaa45cf17f3e9752019efac0fd7a7 100644 (file)
@@ -1130,7 +1130,7 @@ build_column_default(Relation rel, int attrno)
        {
                NextValueExpr *nve = makeNode(NextValueExpr);
 
-               nve->seqid = getOwnedSequence(RelationGetRelid(rel), attrno);
+               nve->seqid = getIdentitySequence(RelationGetRelid(rel), attrno, false);
                nve->typeId = att_tup->atttypid;
 
                return (Node *) nve;
index ef9c86864cd60baf230a4d3e8bdf81d37ac31816..12296b61309dd9673458e3907d567aa4ef799f92 100644 (file)
@@ -209,8 +209,8 @@ extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
 extern Oid     getExtensionOfObject(Oid classId, Oid objectId);
 
 extern bool sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId);
-extern List *getOwnedSequences(Oid relid, AttrNumber attnum);
-extern Oid     getOwnedSequence(Oid relid, AttrNumber attnum);
+extern List *getOwnedSequences(Oid relid);
+extern Oid     getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok);
 
 extern Oid     get_constraint_index(Oid constraintId);
 
index 2286519b0cae52a27cb3c75a92172d90ee75bd34..36a239363a93f7b5eeb1fdde396efc528f65aaaf 100644 (file)
@@ -399,3 +399,8 @@ CREATE TABLE itest_child PARTITION OF itest_parent (
 ) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error
 ERROR:  identity columns are not supported on partitions
 DROP TABLE itest_parent;
+-- test that sequence of half-dropped serial column is properly ignored
+CREATE TABLE itest14 (id serial);
+ALTER TABLE itest14 ALTER id DROP DEFAULT;
+ALTER TABLE itest14 ALTER id ADD GENERATED BY DEFAULT AS IDENTITY;
+INSERT INTO itest14 (id) VALUES (DEFAULT);
index 8dcfdf3dc1510097e613576254b7698d843ffceb..4b03d24effe70414ac1df60b7593f2a287151cb9 100644 (file)
@@ -254,3 +254,11 @@ CREATE TABLE itest_child PARTITION OF itest_parent (
     f3 WITH OPTIONS GENERATED ALWAYS AS IDENTITY
 ) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error
 DROP TABLE itest_parent;
+
+
+-- test that sequence of half-dropped serial column is properly ignored
+
+CREATE TABLE itest14 (id serial);
+ALTER TABLE itest14 ALTER id DROP DEFAULT;
+ALTER TABLE itest14 ALTER id ADD GENERATED BY DEFAULT AS IDENTITY;
+INSERT INTO itest14 (id) VALUES (DEFAULT);