From a2597ef17958e75e7ba26507dc407249cc9e7134 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 22 May 2002 21:40:55 +0000 Subject: [PATCH] Modify sequence state storage to eliminate dangling-pointer problem exemplified by bug #671. Moving the storage to relcache turned out to be a bad idea because relcache might decide to discard the info. Instead, open and close the relcache entry on each sequence operation, and use a record of the current XID to discover whether we already hold AccessShareLock on the sequence. --- src/backend/access/transam/xact.c | 5 +- src/backend/commands/define.c | 31 +++- src/backend/commands/sequence.c | 248 ++++++++++++++---------------- src/include/commands/defrem.h | 5 +- src/include/commands/sequence.h | 3 +- 5 files changed, 153 insertions(+), 139 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 10de2f8a6d..b874b4790a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.123 2002/05/21 22:05:53 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.124 2002/05/22 21:40:55 tgl Exp $ * * NOTES * Transaction aborts can now occur two ways: @@ -166,7 +166,6 @@ #include "catalog/index.h" #include "catalog/namespace.h" #include "commands/async.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "executor/spi.h" #include "libpq/be-fsstubs.h" @@ -947,7 +946,6 @@ CommitTransaction(void) /* NOTIFY commit must also come before lower-level cleanup */ AtCommit_Notify(); - CloseSequences(); AtEOXact_portals(); /* Here is where we really truly commit. */ @@ -1063,7 +1061,6 @@ AbortTransaction(void) DeferredTriggerAbortXact(); lo_commit(false); /* 'false' means it's abort */ AtAbort_Notify(); - CloseSequences(); AtEOXact_portals(); /* Advertise the fact that we aborted in pg_clog. */ diff --git a/src/backend/commands/define.c b/src/backend/commands/define.c index 889ddd0f44..20d2a61de4 100644 --- a/src/backend/commands/define.c +++ b/src/backend/commands/define.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.76 2002/04/15 05:22:03 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.77 2002/05/22 21:40:55 tgl Exp $ * * DESCRIPTION * The "DefineFoo" routines take the parse tree and pick out the @@ -37,6 +37,7 @@ #include "commands/defrem.h" #include "parser/parse_type.h" +#include "utils/int8.h" /* @@ -114,6 +115,34 @@ defGetNumeric(DefElem *def) return 0; /* keep compiler quiet */ } +/* + * Extract an int64 value from a DefElem. + */ +int64 +defGetInt64(DefElem *def) +{ + if (def->arg == NULL) + elog(ERROR, "Define: \"%s\" requires a numeric value", + def->defname); + switch (nodeTag(def->arg)) + { + case T_Integer: + return (int64) intVal(def->arg); + case T_Float: + /* + * Values too large for int4 will be represented as Float + * constants by the lexer. Accept these if they are valid int8 + * strings. + */ + return DatumGetInt64(DirectFunctionCall1(int8in, + CStringGetDatum(strVal(def->arg)))); + default: + elog(ERROR, "Define: \"%s\" requires a numeric value", + def->defname); + } + return 0; /* keep compiler quiet */ +} + /* * Extract a possibly-qualified name (as a List of Strings) from a DefElem. */ diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c79e6f97e1..8d90c81d3c 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.78 2002/05/21 22:05:54 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.79 2002/05/22 21:40:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,16 +17,14 @@ #include "access/heapam.h" #include "catalog/namespace.h" #include "catalog/pg_type.h" +#include "commands/defrem.h" #include "commands/tablecmds.h" #include "commands/sequence.h" #include "miscadmin.h" #include "utils/acl.h" #include "utils/builtins.h" -#include "utils/int8.h" -#define SEQ_MAGIC 0x1717 - #ifndef INT64_IS_BUSTED #ifdef HAVE_LL_CONSTANTS #define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL) @@ -46,29 +44,47 @@ */ #define SEQ_LOG_VALS 32 +/* + * The "special area" of a sequence's buffer page looks like this. + */ +#define SEQ_MAGIC 0x1717 + typedef struct sequence_magic { uint32 magic; } sequence_magic; +/* + * We store a SeqTable item for every sequence we have touched in the current + * session. This is needed to hold onto nextval/currval state. (We can't + * rely on the relcache, since it's only, well, a cache, and may decide to + * discard entries.) + * + * XXX We use linear search to find pre-existing SeqTable entries. This is + * good when only a small number of sequences are touched in a session, but + * would suck with many different sequences. Perhaps use a hashtable someday. + */ typedef struct SeqTableData { - Oid relid; - Relation rel; /* NULL if rel is not open in cur xact */ - int64 cached; - int64 last; - int64 increment; - struct SeqTableData *next; + struct SeqTableData *next; /* link to next SeqTable object */ + Oid relid; /* pg_class OID of this sequence */ + TransactionId xid; /* xact in which we last did a seq op */ + int64 last; /* value last returned by nextval */ + int64 cached; /* last value already cached for nextval */ + /* if last != cached, we have not used up all the cached values */ + int64 increment; /* copy of sequence's increment field */ } SeqTableData; typedef SeqTableData *SeqTable; -static SeqTable seqtab = NULL; +static SeqTable seqtab = NULL; /* Head of list of SeqTable items */ -static SeqTable init_sequence(char *caller, RangeVar *relation); -static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf); + +static void init_sequence(const char *caller, RangeVar *relation, + SeqTable *p_elm, Relation *p_rel); +static Form_pg_sequence read_info(const char *caller, SeqTable elm, + Relation rel, Buffer *buf); static void init_params(CreateSeqStmt *seq, Form_pg_sequence new); -static int64 get_param(DefElem *def); static void do_setval(RangeVar *sequence, int64 next, bool iscalled); /* @@ -281,6 +297,7 @@ nextval(PG_FUNCTION_ARGS) text *seqin = PG_GETARG_TEXT_P(0); RangeVar *sequence; SeqTable elm; + Relation seqrel; Buffer buf; Page page; Form_pg_sequence seq; @@ -300,7 +317,7 @@ nextval(PG_FUNCTION_ARGS) "nextval")); /* open and AccessShareLock sequence */ - elm = init_sequence("nextval", sequence); + init_sequence("nextval", sequence, &elm, &seqrel); if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK) elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s", @@ -309,11 +326,12 @@ nextval(PG_FUNCTION_ARGS) if (elm->last != elm->cached) /* some numbers were cached */ { elm->last += elm->increment; + relation_close(seqrel, NoLock); PG_RETURN_INT64(elm->last); } - seq = read_info("nextval", elm, &buf); /* lock page' buffer and - * read tuple */ + /* lock page' buffer and read tuple */ + seq = read_info("nextval", elm, seqrel, &buf); page = BufferGetPage(buf); last = next = result = seq->last_value; @@ -421,7 +439,7 @@ nextval(PG_FUNCTION_ARGS) XLogRecPtr recptr; XLogRecData rdata[2]; - xlrec.node = elm->rel->rd_node; + xlrec.node = seqrel->rd_node; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(xl_seq_rec); @@ -453,6 +471,8 @@ nextval(PG_FUNCTION_ARGS) if (WriteBuffer(buf) == STATUS_ERROR) elog(ERROR, "%s.nextval: WriteBuffer failed", sequence->relname); + relation_close(seqrel, NoLock); + PG_RETURN_INT64(result); } @@ -462,13 +482,14 @@ currval(PG_FUNCTION_ARGS) text *seqin = PG_GETARG_TEXT_P(0); RangeVar *sequence; SeqTable elm; + Relation seqrel; int64 result; sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin, "currval")); /* open and AccessShareLock sequence */ - elm = init_sequence("currval", sequence); + init_sequence("currval", sequence, &elm, &seqrel); if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK) elog(ERROR, "%s.currval: you don't have permissions to read sequence %s", @@ -480,6 +501,8 @@ currval(PG_FUNCTION_ARGS) result = elm->last; + relation_close(seqrel, NoLock); + PG_RETURN_INT64(result); } @@ -500,18 +523,19 @@ static void do_setval(RangeVar *sequence, int64 next, bool iscalled) { SeqTable elm; + Relation seqrel; Buffer buf; Form_pg_sequence seq; /* open and AccessShareLock sequence */ - elm = init_sequence("setval", sequence); + init_sequence("setval", sequence, &elm, &seqrel); if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK) elog(ERROR, "%s.setval: you don't have permissions to set sequence %s", sequence->relname, sequence->relname); /* lock page' buffer and read tuple */ - seq = read_info("setval", elm, &buf); + seq = read_info("setval", elm, seqrel, &buf); if ((next < seq->min_value) || (next > seq->max_value)) elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")", @@ -529,7 +553,7 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled) XLogRecData rdata[2]; Page page = BufferGetPage(buf); - xlrec.node = elm->rel->rd_node; + xlrec.node = seqrel->rd_node; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(xl_seq_rec); @@ -559,6 +583,8 @@ do_setval(RangeVar *sequence, int64 next, bool iscalled) if (WriteBuffer(buf) == STATUS_ERROR) elog(ERROR, "%s.setval: WriteBuffer failed", sequence->relname); + + relation_close(seqrel, NoLock); } /* @@ -600,84 +626,48 @@ setval_and_iscalled(PG_FUNCTION_ARGS) PG_RETURN_INT64(next); } -static Form_pg_sequence -read_info(char *caller, SeqTable elm, Buffer *buf) -{ - PageHeader page; - ItemId lp; - HeapTupleData tuple; - sequence_magic *sm; - Form_pg_sequence seq; - if (elm->rel->rd_nblocks > 1) - elog(ERROR, "%s.%s: invalid number of blocks in sequence", - RelationGetRelationName(elm->rel), caller); - - *buf = ReadBuffer(elm->rel, 0); - if (!BufferIsValid(*buf)) - elog(ERROR, "%s.%s: ReadBuffer failed", - RelationGetRelationName(elm->rel), caller); - - LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE); - - page = (PageHeader) BufferGetPage(*buf); - sm = (sequence_magic *) PageGetSpecialPointer(page); - - if (sm->magic != SEQ_MAGIC) - elog(ERROR, "%s.%s: bad magic (%08X)", - RelationGetRelationName(elm->rel), caller, sm->magic); - - lp = PageGetItemId(page, FirstOffsetNumber); - Assert(ItemIdIsUsed(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); - - seq = (Form_pg_sequence) GETSTRUCT(&tuple); - - elm->increment = seq->increment_by; - - return seq; -} - - -static SeqTable -init_sequence(char *caller, RangeVar *relation) +/* + * Given a relation name, open and lock the sequence. p_elm and p_rel are + * output parameters. + */ +static void +init_sequence(const char *caller, RangeVar *relation, + SeqTable *p_elm, Relation *p_rel) { Oid relid = RangeVarGetRelid(relation, false); - SeqTable elm, - prev = (SeqTable) NULL; + TransactionId thisxid = GetCurrentTransactionId(); + SeqTable elm; Relation seqrel; /* Look to see if we already have a seqtable entry for relation */ - for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next) + for (elm = seqtab; elm != NULL; elm = elm->next) { if (elm->relid == relid) break; - prev = elm; } - /* If so, and if it's already been opened in this xact, just return it */ - if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL) - return elm; + /* + * Open the sequence relation, acquiring AccessShareLock if we don't + * already have a lock in the current xact. + */ + if (elm == NULL || elm->xid != thisxid) + seqrel = relation_open(relid, AccessShareLock); + else + seqrel = relation_open(relid, NoLock); - /* Else open and check it */ - seqrel = heap_open(relid, AccessShareLock); if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE) elog(ERROR, "%s.%s: %s is not a sequence", relation->relname, caller, relation->relname); /* - * If elm exists but elm->rel is NULL, the seqtable entry is left over - * from a previous xact -- update the entry and reuse it. + * Allocate new seqtable entry if we didn't find one. * * NOTE: seqtable entries remain in the list for the life of a backend. * If the sequence itself is deleted then the entry becomes wasted memory, * but it's small enough that this should not matter. */ - if (elm != (SeqTable) NULL) - { - elm->rel = seqrel; - } - else + if (elm == NULL) { /* * Time to make a new seqtable entry. These entries live as long @@ -686,40 +676,59 @@ init_sequence(char *caller, RangeVar *relation) elm = (SeqTable) malloc(sizeof(SeqTableData)); if (elm == NULL) elog(ERROR, "Memory exhausted in init_sequence"); - elm->rel = seqrel; elm->relid = relid; - elm->cached = elm->last = elm->increment = 0; - elm->next = (SeqTable) NULL; - - if (seqtab == (SeqTable) NULL) - seqtab = elm; - else - prev->next = elm; + /* increment is set to 0 until we do read_info (see currval) */ + elm->last = elm->cached = elm->increment = 0; + elm->next = seqtab; + seqtab = elm; } - return elm; + /* Flag that we have a lock in the current xact. */ + elm->xid = thisxid; + + *p_elm = elm; + *p_rel = seqrel; } -/* - * CloseSequences - * is called by xact mgr at commit/abort. - */ -void -CloseSequences(void) +/* Given an opened relation, lock the page buffer and find the tuple */ +static Form_pg_sequence +read_info(const char *caller, SeqTable elm, + Relation rel, Buffer *buf) { - SeqTable elm; - Relation rel; + PageHeader page; + ItemId lp; + HeapTupleData tuple; + sequence_magic *sm; + Form_pg_sequence seq; - for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next) - { - rel = elm->rel; - if (rel != (Relation) NULL) /* opened in current xact */ - { - elm->rel = (Relation) NULL; - heap_close(rel, AccessShareLock); - } - } + if (rel->rd_nblocks > 1) + elog(ERROR, "%s.%s: invalid number of blocks in sequence", + RelationGetRelationName(rel), caller); + + *buf = ReadBuffer(rel, 0); + if (!BufferIsValid(*buf)) + elog(ERROR, "%s.%s: ReadBuffer failed", + RelationGetRelationName(rel), caller); + + LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE); + + page = (PageHeader) BufferGetPage(*buf); + sm = (sequence_magic *) PageGetSpecialPointer(page); + + if (sm->magic != SEQ_MAGIC) + elog(ERROR, "%s.%s: bad magic (%08X)", + RelationGetRelationName(rel), caller, sm->magic); + + lp = PageGetItemId(page, FirstOffsetNumber); + Assert(ItemIdIsUsed(lp)); + tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); + + seq = (Form_pg_sequence) GETSTRUCT(&tuple); + + elm->increment = seq->increment_by; + + return seq; } @@ -761,7 +770,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) if (increment_by == (DefElem *) NULL) /* INCREMENT BY */ new->increment_by = 1; - else if ((new->increment_by = get_param(increment_by)) == 0) + else if ((new->increment_by = defGetInt64(increment_by)) == 0) elog(ERROR, "DefineSequence: can't INCREMENT by 0"); if (max_value == (DefElem *) NULL) /* MAXVALUE */ @@ -772,7 +781,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) new->max_value = -1; /* descending seq */ } else - new->max_value = get_param(max_value); + new->max_value = defGetInt64(max_value); if (min_value == (DefElem *) NULL) /* MINVALUE */ { @@ -782,7 +791,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) new->min_value = SEQ_MINVALUE; /* descending seq */ } else - new->min_value = get_param(min_value); + new->min_value = defGetInt64(min_value); if (new->min_value >= new->max_value) elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")", @@ -796,7 +805,7 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) new->last_value = new->max_value; /* descending seq */ } else - new->last_value = get_param(last_value); + new->last_value = defGetInt64(last_value); if (new->last_value < new->min_value) elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")", @@ -807,33 +816,12 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new) if (cache_value == (DefElem *) NULL) /* CACHE */ new->cache_value = 1; - else if ((new->cache_value = get_param(cache_value)) <= 0) + else if ((new->cache_value = defGetInt64(cache_value)) <= 0) elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0", new->cache_value); } -static int64 -get_param(DefElem *def) -{ - if (def->arg == (Node *) NULL) - elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname); - - if (IsA(def->arg, Integer)) - return (int64) intVal(def->arg); - - /* - * Values too large for int4 will be represented as Float constants by - * the lexer. Accept these if they are valid int8 strings. - */ - if (IsA(def->arg, Float)) - return DatumGetInt64(DirectFunctionCall1(int8in, - CStringGetDatum(strVal(def->arg)))); - - /* Shouldn't get here unless parser messed up */ - elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname); - return 0; /* not reached; keep compiler quiet */ -} void seq_redo(XLogRecPtr lsn, XLogRecord *record) diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index cf9048c714..5ebead414d 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: defrem.h,v 1.37 2002/05/17 18:32:52 petere Exp $ + * $Id: defrem.h,v 1.38 2002/05/22 21:40:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -54,12 +54,13 @@ extern void DefineDomain(CreateDomainStmt *stmt); extern void RemoveDomain(List *names, int behavior); -/* support routines in define.c */ +/* support routines in commands/define.c */ extern void case_translate_language_name(const char *input, char *output); extern char *defGetString(DefElem *def); extern double defGetNumeric(DefElem *def); +extern int64 defGetInt64(DefElem *def); extern List *defGetQualifiedName(DefElem *def); extern TypeName *defGetTypeName(DefElem *def); extern int defGetTypeLength(DefElem *def); diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 48c0673cdf..e91523ac47 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: sequence.h,v 1.19 2001/11/05 17:46:33 momjian Exp $ + * $Id: sequence.h,v 1.20 2002/05/22 21:40:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -84,7 +84,6 @@ extern Datum setval(PG_FUNCTION_ARGS); extern Datum setval_and_iscalled(PG_FUNCTION_ARGS); extern void DefineSequence(CreateSeqStmt *stmt); -extern void CloseSequences(void); extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr); -- 2.40.0