From a4a7eb37d4c067f3d8d76338cda1bb33722e50ec Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 25 Jul 2012 17:40:43 -0400 Subject: [PATCH] Fix longstanding crash-safety bug with newly-created-or-reset sequences. If a crash occurred immediately after the first nextval() call for a serial column, WAL replay would restore the sequence to a state in which it appeared that no nextval() had been done, thus allowing the first sequence value to be returned again by the next nextval() call; as reported in bug #6748 from Xiangming Mei. More generally, the problem would occur if an ALTER SEQUENCE was executed on a freshly created or reset sequence. (The manifestation with serial columns was introduced in 8.2 when we added an ALTER SEQUENCE OWNED BY step to serial column creation.) The cause is that sequence creation attempted to save one WAL entry by writing out a WAL record that made it appear that the first nextval() had already happened (viz, with is_called = true), while marking the sequence's in-database state with log_cnt = 1 to show that the first nextval() need not emit a WAL record. However, ALTER SEQUENCE would emit a new WAL entry reflecting the actual in-database state (with is_called = false). Then, nextval would allocate the first sequence value and set is_called = true, but it would trust the log_cnt value and not emit any WAL record. A crash at this point would thus restore the sequence to its post-ALTER state, causing the next nextval() call to return the first sequence value again. To fix, get rid of the idea of logging an is_called status different from reality. This means that the first nextval-driven WAL record will happen at the first nextval call not the second, but the marginal cost of that is pretty negligible. In addition, make sure that ALTER SEQUENCE resets log_cnt to zero in any case where it touches sequence parameters that affect future nextval results. This will result in some user-visible changes in the contents of a sequence's log_cnt column, as reflected in the patch's regression test changes; but no application should be depending on that anyway, since it was already true that log_cnt changes rather unpredictably depending on checkpoint timing. In addition, make some basically-cosmetic improvements to get rid of sequence.c's undesirable intimacy with page layout details. It was always really trying to WAL-log the contents of the sequence tuple, so we should have it do that directly using a HeapTuple's t_data and t_len, rather than backing into it with some magic assumptions about where the tuple would be on the sequence's page. Back-patch to all supported branches. --- src/backend/commands/sequence.c | 165 +++++++++++++---------- src/test/regress/expected/sequence.out | 4 +- src/test/regress/expected/sequence_1.out | 7 +- 3 files changed, 102 insertions(+), 74 deletions(-) diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 34b74f6c38..fbb6489915 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -38,7 +38,7 @@ /* * We don't want to log each fetching of a value from a sequence, * so we pre-log a few fetches in advance. In the event of - * crash we can lose as much as we pre-logged. + * crash we can lose (skip over) as many values as we pre-logged. */ #define SEQ_LOG_VALS 32 @@ -73,7 +73,7 @@ typedef struct SeqTableData int64 cached; /* last value already cached for nextval */ /* if last != cached, we have not used up all the cached values */ int64 increment; /* copy of sequence's increment field */ - /* note that increment is zero until we first do read_info() */ + /* note that increment is zero until we first do read_seq_tuple() */ } SeqTableData; typedef SeqTableData *SeqTable; @@ -90,7 +90,8 @@ static void fill_seq_with_data(Relation rel, HeapTuple tuple); static int64 nextval_internal(Oid relid); static Relation open_share_lock(SeqTable seq); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); -static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf); +static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel, + Buffer *buf, HeapTuple seqtuple); static void init_params(List *options, bool isInit, Form_pg_sequence new, List **owned_by); static void do_setval(Oid relid, int64 next, bool iscalled); @@ -187,7 +188,7 @@ DefineSequence(CreateSeqStmt *seq) case SEQ_COL_LOG: coldef->typeName = makeTypeNameFromOid(INT8OID, -1); coldef->colname = "log_cnt"; - value[i - 1] = Int64GetDatum((int64) 1); + value[i - 1] = Int64GetDatum((int64) 0); break; case SEQ_COL_CYCLE: coldef->typeName = makeTypeNameFromOid(BOOLOID, -1); @@ -247,10 +248,8 @@ ResetSequence(Oid seq_relid) SeqTable elm; Form_pg_sequence seq; Buffer buf; - Page page; + HeapTupleData seqtuple; HeapTuple tuple; - HeapTupleData tupledata; - ItemId lp; /* * Read the old sequence. This does a bit more work than really @@ -258,18 +257,12 @@ ResetSequence(Oid seq_relid) * indeed a sequence. */ init_sequence(seq_relid, &elm, &seq_rel); - read_info(elm, seq_rel, &buf); + (void) read_seq_tuple(elm, seq_rel, &buf, &seqtuple); /* * Copy the existing sequence tuple. */ - page = BufferGetPage(buf); - lp = PageGetItemId(page, FirstOffsetNumber); - Assert(ItemIdIsNormal(lp)); - - tupledata.t_data = (HeapTupleHeader) PageGetItem(page, lp); - tupledata.t_len = ItemIdGetLength(lp); - tuple = heap_copytuple(&tupledata); + tuple = heap_copytuple(&seqtuple); /* Now we're done with the old page */ UnlockReleaseBuffer(buf); @@ -281,7 +274,7 @@ ResetSequence(Oid seq_relid) seq = (Form_pg_sequence) GETSTRUCT(tuple); seq->last_value = seq->start_value; seq->is_called = false; - seq->log_cnt = 1; + seq->log_cnt = 0; /* * Create a new storage file for the sequence. We want to keep the @@ -378,12 +371,6 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; - Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple); - - /* We do not log first nextval call, so "advance" sequence here */ - /* Note we are scribbling on local tuple, not the disk buffer */ - newseq->is_called = true; - newseq->log_cnt = 0; xlrec.node = rel->rd_node; rdata[0].data = (char *) &xlrec; @@ -419,7 +406,7 @@ AlterSequence(AlterSeqStmt *stmt) SeqTable elm; Relation seqrel; Buffer buf; - Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; FormData_pg_sequence new; List *owned_by; @@ -442,8 +429,7 @@ AlterSequence(AlterSeqStmt *stmt) stmt->sequence->relname); /* lock page' buffer and read tuple into new sequence structure */ - seq = read_info(elm, seqrel, &buf); - page = BufferGetPage(buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); /* Copy old values of options into workspace */ memcpy(&new, seq, sizeof(FormData_pg_sequence)); @@ -456,10 +442,10 @@ AlterSequence(AlterSeqStmt *stmt) elm->cached = elm->last; /* Now okay to update the on-disk tuple */ - memcpy(seq, &new, sizeof(FormData_pg_sequence)); - START_CRIT_SECTION(); + memcpy(seq, &new, sizeof(FormData_pg_sequence)); + MarkBufferDirty(buf); /* XLOG stuff */ @@ -468,6 +454,7 @@ AlterSequence(AlterSeqStmt *stmt) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; + Page page = BufferGetPage(buf); xlrec.node = seqrel->rd_node; rdata[0].data = (char *) &xlrec; @@ -475,9 +462,8 @@ AlterSequence(AlterSeqStmt *stmt) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -541,6 +527,7 @@ nextval_internal(Oid relid) Relation seqrel; Buffer buf; Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; int64 incby, maxv, @@ -579,7 +566,7 @@ nextval_internal(Oid relid) } /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); page = BufferGetPage(buf); last = next = result = seq->last_value; @@ -591,9 +578,8 @@ nextval_internal(Oid relid) if (!seq->is_called) { - rescnt++; /* last_value if not called */ + rescnt++; /* return last_value if not is_called */ fetch--; - log--; } /* @@ -606,7 +592,7 @@ nextval_internal(Oid relid) * checkpoint would fail to advance the sequence past the logged values. * In this case we may as well fetch extra values. */ - if (log < fetch) + if (log < fetch || !seq->is_called) { /* forced log to satisfy local demand for values */ fetch = log = fetch + SEQ_LOG_VALS; @@ -697,8 +683,18 @@ nextval_internal(Oid relid) last_used_seq = elm; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + /* + * We must mark the buffer dirty before doing XLogInsert(); see notes in + * SyncOneBuffer(). However, we don't apply the desired changes just yet. + * This looks like a violation of the buffer update protocol, but it is + * in fact safe because we hold exclusive lock on the buffer. Any other + * process, including a checkpoint, that tries to examine the buffer + * contents will block until we release the lock, and then will see the + * final state that we install below. + */ MarkBufferDirty(buf); /* XLOG stuff */ @@ -708,20 +704,26 @@ nextval_internal(Oid relid) XLogRecPtr recptr; XLogRecData rdata[2]; - xlrec.node = seqrel->rd_node; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(xl_seq_rec); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + /* + * We don't log the current state of the tuple, but rather the state + * as it would appear after "log" more fetches. This lets us skip + * that many future WAL records, at the cost that we lose those + * sequence values if we crash. + */ /* set values that will be saved in xlog */ seq->last_value = next; seq->is_called = true; seq->log_cnt = 0; - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + xlrec.node = seqrel->rd_node; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_seq_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -731,7 +733,7 @@ nextval_internal(Oid relid) PageSetTLI(page, ThisTimeLineID); } - /* update on-disk data */ + /* Now update sequence tuple to the intended final state */ seq->last_value = last; /* last fetched number */ seq->is_called = true; seq->log_cnt = log; /* how much is logged */ @@ -830,6 +832,7 @@ do_setval(Oid relid, int64 next, bool iscalled) SeqTable elm; Relation seqrel; Buffer buf; + HeapTupleData seqtuple; Form_pg_sequence seq; /* open and AccessShareLock sequence */ @@ -846,7 +849,7 @@ do_setval(Oid relid, int64 next, bool iscalled) PreventCommandIfReadOnly("setval()"); /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); if ((next < seq->min_value) || (next > seq->max_value)) { @@ -874,8 +877,13 @@ do_setval(Oid relid, int64 next, bool iscalled) /* In any case, forget any future cached numbers */ elm->cached = elm->last; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + seq->last_value = next; /* last fetched number */ + seq->is_called = iscalled; + seq->log_cnt = 0; + MarkBufferDirty(buf); /* XLOG stuff */ @@ -892,14 +900,8 @@ do_setval(Oid relid, int64 next, bool iscalled) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - /* set values that will be saved in xlog */ - seq->last_value = next; - seq->is_called = true; - seq->log_cnt = 0; - - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -909,11 +911,6 @@ do_setval(Oid relid, int64 next, bool iscalled) PageSetTLI(page, ThisTimeLineID); } - /* save info in sequence relation */ - seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = (iscalled) ? 0 : 1; - END_CRIT_SECTION(); UnlockReleaseBuffer(buf); @@ -1066,13 +1063,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel) } -/* Given an opened relation, lock the page buffer and find the tuple */ +/* + * Given an opened sequence relation, lock the page buffer and find the tuple + * + * *buf receives the reference to the pinned-and-ex-locked buffer + * *seqtuple receives the reference to the sequence tuple proper + * (this arg should point to a local variable of type HeapTupleData) + * + * Function's return value points to the data payload of the tuple + */ static Form_pg_sequence -read_info(SeqTable elm, Relation rel, Buffer *buf) +read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple) { Page page; ItemId lp; - HeapTupleData tuple; sequence_magic *sm; Form_pg_sequence seq; @@ -1088,7 +1092,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp); + + /* Note we currently only bother to set these two fields of *seqtuple */ + seqtuple->t_data = (HeapTupleHeader) PageGetItem(page, lp); + seqtuple->t_len = ItemIdGetLength(lp); /* * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on @@ -1098,15 +1105,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) * bit update, ie, don't bother to WAL-log it, since we can certainly do * this again if the update gets lost. */ - if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId) + if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId) { - HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId); - tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; - tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId); + seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; + seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(*buf); } - seq = (Form_pg_sequence) GETSTRUCT(&tuple); + seq = (Form_pg_sequence) GETSTRUCT(seqtuple); /* this is a handy place to update our copy of the increment */ elm->increment = seq->increment_by; @@ -1210,6 +1217,13 @@ init_params(List *options, bool isInit, defel->defname); } + /* + * We must reset log_cnt when isInit or when changing any parameters + * that would affect future nextval allocations. + */ + if (isInit) + new->log_cnt = 0; + /* INCREMENT BY */ if (increment_by != NULL) { @@ -1218,6 +1232,7 @@ init_params(List *options, bool isInit, ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("INCREMENT must not be zero"))); + new->log_cnt = 0; } else if (isInit) new->increment_by = 1; @@ -1227,30 +1242,39 @@ init_params(List *options, bool isInit, { new->is_cycled = intVal(is_cycled->arg); Assert(BoolIsValid(new->is_cycled)); + new->log_cnt = 0; } else if (isInit) new->is_cycled = false; /* MAXVALUE (null arg means NO MAXVALUE) */ if (max_value != NULL && max_value->arg) + { new->max_value = defGetInt64(max_value); + new->log_cnt = 0; + } else if (isInit || max_value != NULL) { if (new->increment_by > 0) new->max_value = SEQ_MAXVALUE; /* ascending seq */ else new->max_value = -1; /* descending seq */ + new->log_cnt = 0; } /* MINVALUE (null arg means NO MINVALUE) */ if (min_value != NULL && min_value->arg) + { new->min_value = defGetInt64(min_value); + new->log_cnt = 0; + } else if (isInit || min_value != NULL) { if (new->increment_by > 0) new->min_value = 1; /* ascending seq */ else new->min_value = SEQ_MINVALUE; /* descending seq */ + new->log_cnt = 0; } /* crosscheck min/max */ @@ -1312,13 +1336,12 @@ init_params(List *options, bool isInit, else new->last_value = new->start_value; new->is_called = false; - new->log_cnt = 1; + new->log_cnt = 0; } else if (isInit) { new->last_value = new->start_value; new->is_called = false; - new->log_cnt = 1; } /* crosscheck RESTART (or current value, if changing MIN/MAX) */ @@ -1361,6 +1384,7 @@ init_params(List *options, bool isInit, errmsg("CACHE (%s) must be greater than zero", buf))); } + new->log_cnt = 0; } else if (isInit) new->cache_value = 1; @@ -1473,6 +1497,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) SeqTable elm; Relation seqrel; Buffer buf; + HeapTupleData seqtuple; Form_pg_sequence seq; /* open and AccessShareLock sequence */ @@ -1500,7 +1525,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) memset(isnull, 0, sizeof(isnull)); - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); values[0] = Int64GetDatum(seq->start_value); values[1] = Int64GetDatum(seq->min_value); @@ -1555,7 +1580,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record) item = (char *) xlrec + sizeof(xl_seq_rec); itemsz = record->xl_len - sizeof(xl_seq_rec); - itemsz = MAXALIGN(itemsz); + if (PageAddItem(localpage, (Item) item, itemsz, FirstOffsetNumber, false, false) == InvalidOffsetNumber) elog(PANIC, "seq_redo: failed to add item to page"); diff --git a/src/test/regress/expected/sequence.out b/src/test/regress/expected/sequence.out index 1def070903..479dc4a54b 100644 --- a/src/test/regress/expected/sequence.out +++ b/src/test/regress/expected/sequence.out @@ -176,7 +176,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new; SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f + foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f (1 row) SELECT nextval('foo_seq_new'); @@ -194,7 +194,7 @@ SELECT nextval('foo_seq_new'); SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 32 | f | t + foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 31 | f | t (1 row) DROP SEQUENCE foo_seq_new; diff --git a/src/test/regress/expected/sequence_1.out b/src/test/regress/expected/sequence_1.out index 918c88d1cc..26036ab3a7 100644 --- a/src/test/regress/expected/sequence_1.out +++ b/src/test/regress/expected/sequence_1.out @@ -176,7 +176,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new; SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f + foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f (1 row) SELECT nextval('foo_seq_new'); @@ -194,7 +194,7 @@ SELECT nextval('foo_seq_new'); SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 31 | f | t + foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 32 | f | t (1 row) DROP SEQUENCE foo_seq_new; @@ -241,6 +241,9 @@ DROP SEQUENCE myseq2; -- -- Alter sequence -- +ALTER SEQUENCE IF EXISTS sequence_test2 RESTART WITH 24 + INCREMENT BY 4 MAXVALUE 36 MINVALUE 5 CYCLE; +NOTICE: relation "sequence_test2" does not exist, skipping CREATE SEQUENCE sequence_test2 START WITH 32; SELECT nextval('sequence_test2'); nextval -- 2.40.0