]> granicus.if.org Git - postgresql/blob - src/backend/commands/sequence.c
4a56f03e8b65df0e3a2c9536c99b62773244d286
[postgresql] / src / backend / commands / sequence.c
1 /*-------------------------------------------------------------------------
2  *
3  * sequence.c
4  *        PostgreSQL sequences support code.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/sequence.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/bufmask.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "access/xloginsert.h"
24 #include "access/xlogutils.h"
25 #include "catalog/dependency.h"
26 #include "catalog/indexing.h"
27 #include "catalog/namespace.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/pg_sequence.h"
30 #include "catalog/pg_type.h"
31 #include "commands/defrem.h"
32 #include "commands/sequence.h"
33 #include "commands/tablecmds.h"
34 #include "funcapi.h"
35 #include "miscadmin.h"
36 #include "nodes/makefuncs.h"
37 #include "parser/parse_type.h"
38 #include "storage/lmgr.h"
39 #include "storage/proc.h"
40 #include "storage/smgr.h"
41 #include "utils/acl.h"
42 #include "utils/builtins.h"
43 #include "utils/lsyscache.h"
44 #include "utils/resowner.h"
45 #include "utils/syscache.h"
46 #include "utils/varlena.h"
47
48
49 /*
50  * We don't want to log each fetching of a value from a sequence,
51  * so we pre-log a few fetches in advance. In the event of
52  * crash we can lose (skip over) as many values as we pre-logged.
53  */
54 #define SEQ_LOG_VALS    32
55
56 /*
57  * The "special area" of a sequence's buffer page looks like this.
58  */
59 #define SEQ_MAGIC         0x1717
60
61 typedef struct sequence_magic
62 {
63         uint32          magic;
64 } sequence_magic;
65
66 /*
67  * We store a SeqTable item for every sequence we have touched in the current
68  * session.  This is needed to hold onto nextval/currval state.  (We can't
69  * rely on the relcache, since it's only, well, a cache, and may decide to
70  * discard entries.)
71  */
72 typedef struct SeqTableData
73 {
74         Oid                     relid;                  /* pg_class OID of this sequence (hash key) */
75         Oid                     filenode;               /* last seen relfilenode of this sequence */
76         LocalTransactionId lxid;        /* xact in which we last did a seq op */
77         bool            last_valid;             /* do we have a valid "last" value? */
78         int64           last;                   /* value last returned by nextval */
79         int64           cached;                 /* last value already cached for nextval */
80         /* if last != cached, we have not used up all the cached values */
81         int64           increment;              /* copy of sequence's increment field */
82         /* note that increment is zero until we first do nextval_internal() */
83 } SeqTableData;
84
85 typedef SeqTableData *SeqTable;
86
87 static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
88
89 /*
90  * last_used_seq is updated by nextval() to point to the last used
91  * sequence.
92  */
93 static SeqTableData *last_used_seq = NULL;
94
95 static void fill_seq_with_data(Relation rel, HeapTuple tuple);
96 static Relation lock_and_open_sequence(SeqTable seq);
97 static void create_seq_hashtable(void);
98 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
99 static Form_pg_sequence_data read_seq_tuple(Relation rel,
100                            Buffer *buf, HeapTuple seqdatatuple);
101 static void init_params(ParseState *pstate, List *options, bool for_identity,
102                         bool isInit,
103                         Form_pg_sequence seqform,
104                         Form_pg_sequence_data seqdataform, List **owned_by);
105 static void do_setval(Oid relid, int64 next, bool iscalled);
106 static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
107
108
109 /*
110  * DefineSequence
111  *                              Creates a new sequence relation
112  */
113 ObjectAddress
114 DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
115 {
116         FormData_pg_sequence seqform;
117         FormData_pg_sequence_data seqdataform;
118         List       *owned_by;
119         CreateStmt *stmt = makeNode(CreateStmt);
120         Oid                     seqoid;
121         ObjectAddress address;
122         Relation        rel;
123         HeapTuple       tuple;
124         TupleDesc       tupDesc;
125         Datum           value[SEQ_COL_LASTCOL];
126         bool            null[SEQ_COL_LASTCOL];
127         Datum           pgs_values[Natts_pg_sequence];
128         bool            pgs_nulls[Natts_pg_sequence];
129         int                     i;
130
131         /* Unlogged sequences are not implemented -- not clear if useful. */
132         if (seq->sequence->relpersistence == RELPERSISTENCE_UNLOGGED)
133                 ereport(ERROR,
134                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
135                                  errmsg("unlogged sequences are not supported")));
136
137         /*
138          * If if_not_exists was given and a relation with the same name already
139          * exists, bail out. (Note: we needn't check this when not if_not_exists,
140          * because DefineRelation will complain anyway.)
141          */
142         if (seq->if_not_exists)
143         {
144                 RangeVarGetAndCheckCreationNamespace(seq->sequence, NoLock, &seqoid);
145                 if (OidIsValid(seqoid))
146                 {
147                         ereport(NOTICE,
148                                         (errcode(ERRCODE_DUPLICATE_TABLE),
149                                          errmsg("relation \"%s\" already exists, skipping",
150                                                         seq->sequence->relname)));
151                         return InvalidObjectAddress;
152                 }
153         }
154
155         /* Check and set all option values */
156         init_params(pstate, seq->options, seq->for_identity, true, &seqform, &seqdataform, &owned_by);
157
158         /*
159          * Create relation (and fill value[] and null[] for the tuple)
160          */
161         stmt->tableElts = NIL;
162         for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
163         {
164                 ColumnDef  *coldef = makeNode(ColumnDef);
165
166                 coldef->inhcount = 0;
167                 coldef->is_local = true;
168                 coldef->is_not_null = true;
169                 coldef->is_from_type = false;
170                 coldef->is_from_parent = false;
171                 coldef->storage = 0;
172                 coldef->raw_default = NULL;
173                 coldef->cooked_default = NULL;
174                 coldef->collClause = NULL;
175                 coldef->collOid = InvalidOid;
176                 coldef->constraints = NIL;
177                 coldef->location = -1;
178
179                 null[i - 1] = false;
180
181                 switch (i)
182                 {
183                         case SEQ_COL_LASTVAL:
184                                 coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
185                                 coldef->colname = "last_value";
186                                 value[i - 1] = Int64GetDatumFast(seqdataform.last_value);
187                                 break;
188                         case SEQ_COL_LOG:
189                                 coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
190                                 coldef->colname = "log_cnt";
191                                 value[i - 1] = Int64GetDatum((int64) 0);
192                                 break;
193                         case SEQ_COL_CALLED:
194                                 coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
195                                 coldef->colname = "is_called";
196                                 value[i - 1] = BoolGetDatum(false);
197                                 break;
198                 }
199                 stmt->tableElts = lappend(stmt->tableElts, coldef);
200         }
201
202         stmt->relation = seq->sequence;
203         stmt->inhRelations = NIL;
204         stmt->constraints = NIL;
205         stmt->options = NIL;
206         stmt->oncommit = ONCOMMIT_NOOP;
207         stmt->tablespacename = NULL;
208         stmt->if_not_exists = seq->if_not_exists;
209
210         address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL);
211         seqoid = address.objectId;
212         Assert(seqoid != InvalidOid);
213
214         rel = heap_open(seqoid, AccessExclusiveLock);
215         tupDesc = RelationGetDescr(rel);
216
217         /* now initialize the sequence's data */
218         tuple = heap_form_tuple(tupDesc, value, null);
219         fill_seq_with_data(rel, tuple);
220
221         /* process OWNED BY if given */
222         if (owned_by)
223                 process_owned_by(rel, owned_by, seq->for_identity);
224
225         heap_close(rel, NoLock);
226
227         /* fill in pg_sequence */
228         rel = heap_open(SequenceRelationId, RowExclusiveLock);
229         tupDesc = RelationGetDescr(rel);
230
231         memset(pgs_nulls, 0, sizeof(pgs_nulls));
232
233         pgs_values[Anum_pg_sequence_seqrelid - 1] = ObjectIdGetDatum(seqoid);
234         pgs_values[Anum_pg_sequence_seqtypid - 1] = ObjectIdGetDatum(seqform.seqtypid);
235         pgs_values[Anum_pg_sequence_seqstart - 1] = Int64GetDatumFast(seqform.seqstart);
236         pgs_values[Anum_pg_sequence_seqincrement - 1] = Int64GetDatumFast(seqform.seqincrement);
237         pgs_values[Anum_pg_sequence_seqmax - 1] = Int64GetDatumFast(seqform.seqmax);
238         pgs_values[Anum_pg_sequence_seqmin - 1] = Int64GetDatumFast(seqform.seqmin);
239         pgs_values[Anum_pg_sequence_seqcache - 1] = Int64GetDatumFast(seqform.seqcache);
240         pgs_values[Anum_pg_sequence_seqcycle - 1] = BoolGetDatum(seqform.seqcycle);
241
242         tuple = heap_form_tuple(tupDesc, pgs_values, pgs_nulls);
243         CatalogTupleInsert(rel, tuple);
244
245         heap_freetuple(tuple);
246         heap_close(rel, RowExclusiveLock);
247
248         return address;
249 }
250
251 /*
252  * Reset a sequence to its initial value.
253  *
254  * The change is made transactionally, so that on failure of the current
255  * transaction, the sequence will be restored to its previous state.
256  * We do that by creating a whole new relfilenode for the sequence; so this
257  * works much like the rewriting forms of ALTER TABLE.
258  *
259  * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
260  * which must not be released until end of transaction.  Caller is also
261  * responsible for permissions checking.
262  */
263 void
264 ResetSequence(Oid seq_relid)
265 {
266         Relation        seq_rel;
267         SeqTable        elm;
268         Form_pg_sequence_data seq;
269         Buffer          buf;
270         HeapTupleData seqdatatuple;
271         HeapTuple       tuple;
272         HeapTuple       pgstuple;
273         Form_pg_sequence pgsform;
274         int64           startv;
275
276         /*
277          * Read the old sequence.  This does a bit more work than really
278          * necessary, but it's simple, and we do want to double-check that it's
279          * indeed a sequence.
280          */
281         init_sequence(seq_relid, &elm, &seq_rel);
282         (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
283
284         pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seq_relid));
285         if (!HeapTupleIsValid(pgstuple))
286                 elog(ERROR, "cache lookup failed for sequence %u", seq_relid);
287         pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
288         startv = pgsform->seqstart;
289         ReleaseSysCache(pgstuple);
290
291         /*
292          * Copy the existing sequence tuple.
293          */
294         tuple = heap_copytuple(&seqdatatuple);
295
296         /* Now we're done with the old page */
297         UnlockReleaseBuffer(buf);
298
299         /*
300          * Modify the copied tuple to execute the restart (compare the RESTART
301          * action in AlterSequence)
302          */
303         seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
304         seq->last_value = startv;
305         seq->is_called = false;
306         seq->log_cnt = 0;
307
308         /*
309          * Create a new storage file for the sequence.  We want to keep the
310          * sequence's relfrozenxid at 0, since it won't contain any unfrozen XIDs.
311          * Same with relminmxid, since a sequence will never contain multixacts.
312          */
313         RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence,
314                                                           InvalidTransactionId, InvalidMultiXactId);
315
316         /*
317          * Insert the modified tuple into the new storage file.
318          */
319         fill_seq_with_data(seq_rel, tuple);
320
321         /* Clear local cache so that we don't think we have cached numbers */
322         /* Note that we do not change the currval() state */
323         elm->cached = elm->last;
324
325         relation_close(seq_rel, NoLock);
326 }
327
328 /*
329  * Initialize a sequence's relation with the specified tuple as content
330  */
331 static void
332 fill_seq_with_data(Relation rel, HeapTuple tuple)
333 {
334         Buffer          buf;
335         Page            page;
336         sequence_magic *sm;
337         OffsetNumber offnum;
338
339         /* Initialize first page of relation with special magic number */
340
341         buf = ReadBuffer(rel, P_NEW);
342         Assert(BufferGetBlockNumber(buf) == 0);
343
344         page = BufferGetPage(buf);
345
346         PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
347         sm = (sequence_magic *) PageGetSpecialPointer(page);
348         sm->magic = SEQ_MAGIC;
349
350         /* Now insert sequence tuple */
351
352         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
353
354         /*
355          * Since VACUUM does not process sequences, we have to force the tuple to
356          * have xmin = FrozenTransactionId now.  Otherwise it would become
357          * invisible to SELECTs after 2G transactions.  It is okay to do this
358          * because if the current transaction aborts, no other xact will ever
359          * examine the sequence tuple anyway.
360          */
361         HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
362         HeapTupleHeaderSetXminFrozen(tuple->t_data);
363         HeapTupleHeaderSetCmin(tuple->t_data, FirstCommandId);
364         HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
365         tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
366         ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
367
368         /* check the comment above nextval_internal()'s equivalent call. */
369         if (RelationNeedsWAL(rel))
370                 GetTopTransactionId();
371
372         START_CRIT_SECTION();
373
374         MarkBufferDirty(buf);
375
376         offnum = PageAddItem(page, (Item) tuple->t_data, tuple->t_len,
377                                                  InvalidOffsetNumber, false, false);
378         if (offnum != FirstOffsetNumber)
379                 elog(ERROR, "failed to add sequence tuple to page");
380
381         /* XLOG stuff */
382         if (RelationNeedsWAL(rel))
383         {
384                 xl_seq_rec      xlrec;
385                 XLogRecPtr      recptr;
386
387                 XLogBeginInsert();
388                 XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
389
390                 xlrec.node = rel->rd_node;
391
392                 XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
393                 XLogRegisterData((char *) tuple->t_data, tuple->t_len);
394
395                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
396
397                 PageSetLSN(page, recptr);
398         }
399
400         END_CRIT_SECTION();
401
402         UnlockReleaseBuffer(buf);
403 }
404
405 /*
406  * AlterSequence
407  *
408  * Modify the definition of a sequence relation
409  */
410 ObjectAddress
411 AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
412 {
413         Oid                     relid;
414         SeqTable        elm;
415         Relation        seqrel;
416         Buffer          buf;
417         HeapTupleData datatuple;
418         Form_pg_sequence seqform;
419         Form_pg_sequence_data newdataform;
420         List       *owned_by;
421         ObjectAddress address;
422         Relation        rel;
423         HeapTuple       seqtuple;
424         HeapTuple       newdatatuple;
425
426         /* Open and lock sequence. */
427         relid = RangeVarGetRelid(stmt->sequence,
428                                                          ShareRowExclusiveLock,
429                                                          stmt->missing_ok);
430         if (relid == InvalidOid)
431         {
432                 ereport(NOTICE,
433                                 (errmsg("relation \"%s\" does not exist, skipping",
434                                                 stmt->sequence->relname)));
435                 return InvalidObjectAddress;
436         }
437
438         init_sequence(relid, &elm, &seqrel);
439
440         /* allow ALTER to sequence owner only */
441         if (!pg_class_ownercheck(relid, GetUserId()))
442                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
443                                            stmt->sequence->relname);
444
445         rel = heap_open(SequenceRelationId, RowExclusiveLock);
446         seqtuple = SearchSysCacheCopy1(SEQRELID,
447                                                                    ObjectIdGetDatum(relid));
448         if (!HeapTupleIsValid(seqtuple))
449                 elog(ERROR, "cache lookup failed for sequence %u",
450                          relid);
451
452         seqform = (Form_pg_sequence) GETSTRUCT(seqtuple);
453
454         /* lock page's buffer and read tuple into new sequence structure */
455         (void) read_seq_tuple(seqrel, &buf, &datatuple);
456
457         /* copy the existing sequence data tuple, so it can be modified localy */
458         newdatatuple = heap_copytuple(&datatuple);
459         newdataform = (Form_pg_sequence_data) GETSTRUCT(newdatatuple);
460
461         UnlockReleaseBuffer(buf);
462
463         /* Check and set new values */
464         init_params(pstate, stmt->options, stmt->for_identity, false, seqform,
465                                 newdataform, &owned_by);
466
467         /* Clear local cache so that we don't think we have cached numbers */
468         /* Note that we do not change the currval() state */
469         elm->cached = elm->last;
470
471         /* check the comment above nextval_internal()'s equivalent call. */
472         if (RelationNeedsWAL(seqrel))
473                 GetTopTransactionId();
474
475         /*
476          * Create a new storage file for the sequence, making the state changes
477          * transactional.  We want to keep the sequence's relfrozenxid at 0, since
478          * it won't contain any unfrozen XIDs.  Same with relminmxid, since a
479          * sequence will never contain multixacts.
480          */
481         RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence,
482                                                           InvalidTransactionId, InvalidMultiXactId);
483
484         /*
485          * Insert the modified tuple into the new storage file.
486          */
487         fill_seq_with_data(seqrel, newdatatuple);
488
489         /* process OWNED BY if given */
490         if (owned_by)
491                 process_owned_by(seqrel, owned_by, stmt->for_identity);
492
493         InvokeObjectPostAlterHook(RelationRelationId, relid, 0);
494
495         ObjectAddressSet(address, RelationRelationId, relid);
496
497         CatalogTupleUpdate(rel, &seqtuple->t_self, seqtuple);
498
499         heap_close(rel, RowExclusiveLock);
500         relation_close(seqrel, NoLock);
501
502         return address;
503 }
504
505 void
506 DeleteSequenceTuple(Oid relid)
507 {
508         Relation        rel;
509         HeapTuple       tuple;
510
511         rel = heap_open(SequenceRelationId, RowExclusiveLock);
512
513         tuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(relid));
514         if (!HeapTupleIsValid(tuple))
515                 elog(ERROR, "cache lookup failed for sequence %u", relid);
516
517         CatalogTupleDelete(rel, &tuple->t_self);
518
519         ReleaseSysCache(tuple);
520         heap_close(rel, RowExclusiveLock);
521 }
522
523 /*
524  * Note: nextval with a text argument is no longer exported as a pg_proc
525  * entry, but we keep it around to ease porting of C code that may have
526  * called the function directly.
527  */
528 Datum
529 nextval(PG_FUNCTION_ARGS)
530 {
531         text       *seqin = PG_GETARG_TEXT_PP(0);
532         RangeVar   *sequence;
533         Oid                     relid;
534
535         sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
536
537         /*
538          * XXX: This is not safe in the presence of concurrent DDL, but acquiring
539          * a lock here is more expensive than letting nextval_internal do it,
540          * since the latter maintains a cache that keeps us from hitting the lock
541          * manager more than once per transaction.  It's not clear whether the
542          * performance penalty is material in practice, but for now, we do it this
543          * way.
544          */
545         relid = RangeVarGetRelid(sequence, NoLock, false);
546
547         PG_RETURN_INT64(nextval_internal(relid, true));
548 }
549
550 Datum
551 nextval_oid(PG_FUNCTION_ARGS)
552 {
553         Oid                     relid = PG_GETARG_OID(0);
554
555         PG_RETURN_INT64(nextval_internal(relid, true));
556 }
557
558 int64
559 nextval_internal(Oid relid, bool check_permissions)
560 {
561         SeqTable        elm;
562         Relation        seqrel;
563         Buffer          buf;
564         Page            page;
565         HeapTuple       pgstuple;
566         Form_pg_sequence pgsform;
567         HeapTupleData seqdatatuple;
568         Form_pg_sequence_data seq;
569         int64           incby,
570                                 maxv,
571                                 minv,
572                                 cache,
573                                 log,
574                                 fetch,
575                                 last;
576         int64           result,
577                                 next,
578                                 rescnt = 0;
579         bool            cycle;
580         bool            logit = false;
581
582         /* open and lock sequence */
583         init_sequence(relid, &elm, &seqrel);
584
585         if (check_permissions &&
586                 pg_class_aclcheck(elm->relid, GetUserId(),
587                                                   ACL_USAGE | ACL_UPDATE) != ACLCHECK_OK)
588                 ereport(ERROR,
589                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
590                                  errmsg("permission denied for sequence %s",
591                                                 RelationGetRelationName(seqrel))));
592
593         /* read-only transactions may only modify temp sequences */
594         if (!seqrel->rd_islocaltemp)
595                 PreventCommandIfReadOnly("nextval()");
596
597         /*
598          * Forbid this during parallel operation because, to make it work, the
599          * cooperating backends would need to share the backend-local cached
600          * sequence information.  Currently, we don't support that.
601          */
602         PreventCommandIfParallelMode("nextval()");
603
604         if (elm->last != elm->cached)           /* some numbers were cached */
605         {
606                 Assert(elm->last_valid);
607                 Assert(elm->increment != 0);
608                 elm->last += elm->increment;
609                 relation_close(seqrel, NoLock);
610                 last_used_seq = elm;
611                 return elm->last;
612         }
613
614         pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(relid));
615         if (!HeapTupleIsValid(pgstuple))
616                 elog(ERROR, "cache lookup failed for sequence %u", relid);
617         pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
618         incby = pgsform->seqincrement;
619         maxv = pgsform->seqmax;
620         minv = pgsform->seqmin;
621         cache = pgsform->seqcache;
622         cycle = pgsform->seqcycle;
623         ReleaseSysCache(pgstuple);
624
625         /* lock page' buffer and read tuple */
626         seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
627         page = BufferGetPage(buf);
628
629         elm->increment = incby;
630         last = next = result = seq->last_value;
631         fetch = cache;
632         log = seq->log_cnt;
633
634         if (!seq->is_called)
635         {
636                 rescnt++;                               /* return last_value if not is_called */
637                 fetch--;
638         }
639
640         /*
641          * Decide whether we should emit a WAL log record.  If so, force up the
642          * fetch count to grab SEQ_LOG_VALS more values than we actually need to
643          * cache.  (These will then be usable without logging.)
644          *
645          * If this is the first nextval after a checkpoint, we must force a new
646          * WAL record to be written anyway, else replay starting from the
647          * checkpoint would fail to advance the sequence past the logged values.
648          * In this case we may as well fetch extra values.
649          */
650         if (log < fetch || !seq->is_called)
651         {
652                 /* forced log to satisfy local demand for values */
653                 fetch = log = fetch + SEQ_LOG_VALS;
654                 logit = true;
655         }
656         else
657         {
658                 XLogRecPtr      redoptr = GetRedoRecPtr();
659
660                 if (PageGetLSN(page) <= redoptr)
661                 {
662                         /* last update of seq was before checkpoint */
663                         fetch = log = fetch + SEQ_LOG_VALS;
664                         logit = true;
665                 }
666         }
667
668         while (fetch)                           /* try to fetch cache [+ log ] numbers */
669         {
670                 /*
671                  * Check MAXVALUE for ascending sequences and MINVALUE for descending
672                  * sequences
673                  */
674                 if (incby > 0)
675                 {
676                         /* ascending sequence */
677                         if ((maxv >= 0 && next > maxv - incby) ||
678                                 (maxv < 0 && next + incby > maxv))
679                         {
680                                 if (rescnt > 0)
681                                         break;          /* stop fetching */
682                                 if (!cycle)
683                                 {
684                                         char            buf[100];
685
686                                         snprintf(buf, sizeof(buf), INT64_FORMAT, maxv);
687                                         ereport(ERROR,
688                                                  (errcode(ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED),
689                                                   errmsg("nextval: reached maximum value of sequence \"%s\" (%s)",
690                                                                  RelationGetRelationName(seqrel), buf)));
691                                 }
692                                 next = minv;
693                         }
694                         else
695                                 next += incby;
696                 }
697                 else
698                 {
699                         /* descending sequence */
700                         if ((minv < 0 && next < minv - incby) ||
701                                 (minv >= 0 && next + incby < minv))
702                         {
703                                 if (rescnt > 0)
704                                         break;          /* stop fetching */
705                                 if (!cycle)
706                                 {
707                                         char            buf[100];
708
709                                         snprintf(buf, sizeof(buf), INT64_FORMAT, minv);
710                                         ereport(ERROR,
711                                                  (errcode(ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED),
712                                                   errmsg("nextval: reached minimum value of sequence \"%s\" (%s)",
713                                                                  RelationGetRelationName(seqrel), buf)));
714                                 }
715                                 next = maxv;
716                         }
717                         else
718                                 next += incby;
719                 }
720                 fetch--;
721                 if (rescnt < cache)
722                 {
723                         log--;
724                         rescnt++;
725                         last = next;
726                         if (rescnt == 1)        /* if it's first result - */
727                                 result = next;  /* it's what to return */
728                 }
729         }
730
731         log -= fetch;                           /* adjust for any unfetched numbers */
732         Assert(log >= 0);
733
734         /* save info in local cache */
735         elm->last = result;                     /* last returned number */
736         elm->cached = last;                     /* last fetched number */
737         elm->last_valid = true;
738
739         last_used_seq = elm;
740
741         /*
742          * If something needs to be WAL logged, acquire an xid, so this
743          * transaction's commit will trigger a WAL flush and wait for syncrep.
744          * It's sufficient to ensure the toplevel transaction has an xid, no need
745          * to assign xids subxacts, that'll already trigger an appropriate wait.
746          * (Have to do that here, so we're outside the critical section)
747          */
748         if (logit && RelationNeedsWAL(seqrel))
749                 GetTopTransactionId();
750
751         /* ready to change the on-disk (or really, in-buffer) tuple */
752         START_CRIT_SECTION();
753
754         /*
755          * We must mark the buffer dirty before doing XLogInsert(); see notes in
756          * SyncOneBuffer().  However, we don't apply the desired changes just yet.
757          * This looks like a violation of the buffer update protocol, but it is in
758          * fact safe because we hold exclusive lock on the buffer.  Any other
759          * process, including a checkpoint, that tries to examine the buffer
760          * contents will block until we release the lock, and then will see the
761          * final state that we install below.
762          */
763         MarkBufferDirty(buf);
764
765         /* XLOG stuff */
766         if (logit && RelationNeedsWAL(seqrel))
767         {
768                 xl_seq_rec      xlrec;
769                 XLogRecPtr      recptr;
770
771                 /*
772                  * We don't log the current state of the tuple, but rather the state
773                  * as it would appear after "log" more fetches.  This lets us skip
774                  * that many future WAL records, at the cost that we lose those
775                  * sequence values if we crash.
776                  */
777                 XLogBeginInsert();
778                 XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
779
780                 /* set values that will be saved in xlog */
781                 seq->last_value = next;
782                 seq->is_called = true;
783                 seq->log_cnt = 0;
784
785                 xlrec.node = seqrel->rd_node;
786
787                 XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
788                 XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
789
790                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
791
792                 PageSetLSN(page, recptr);
793         }
794
795         /* Now update sequence tuple to the intended final state */
796         seq->last_value = last;         /* last fetched number */
797         seq->is_called = true;
798         seq->log_cnt = log;                     /* how much is logged */
799
800         END_CRIT_SECTION();
801
802         UnlockReleaseBuffer(buf);
803
804         relation_close(seqrel, NoLock);
805
806         return result;
807 }
808
809 Datum
810 currval_oid(PG_FUNCTION_ARGS)
811 {
812         Oid                     relid = PG_GETARG_OID(0);
813         int64           result;
814         SeqTable        elm;
815         Relation        seqrel;
816
817         /* open and lock sequence */
818         init_sequence(relid, &elm, &seqrel);
819
820         if (pg_class_aclcheck(elm->relid, GetUserId(),
821                                                   ACL_SELECT | ACL_USAGE) != ACLCHECK_OK)
822                 ereport(ERROR,
823                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
824                                  errmsg("permission denied for sequence %s",
825                                                 RelationGetRelationName(seqrel))));
826
827         if (!elm->last_valid)
828                 ereport(ERROR,
829                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
830                                  errmsg("currval of sequence \"%s\" is not yet defined in this session",
831                                                 RelationGetRelationName(seqrel))));
832
833         result = elm->last;
834
835         relation_close(seqrel, NoLock);
836
837         PG_RETURN_INT64(result);
838 }
839
840 Datum
841 lastval(PG_FUNCTION_ARGS)
842 {
843         Relation        seqrel;
844         int64           result;
845
846         if (last_used_seq == NULL)
847                 ereport(ERROR,
848                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
849                                  errmsg("lastval is not yet defined in this session")));
850
851         /* Someone may have dropped the sequence since the last nextval() */
852         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(last_used_seq->relid)))
853                 ereport(ERROR,
854                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
855                                  errmsg("lastval is not yet defined in this session")));
856
857         seqrel = lock_and_open_sequence(last_used_seq);
858
859         /* nextval() must have already been called for this sequence */
860         Assert(last_used_seq->last_valid);
861
862         if (pg_class_aclcheck(last_used_seq->relid, GetUserId(),
863                                                   ACL_SELECT | ACL_USAGE) != ACLCHECK_OK)
864                 ereport(ERROR,
865                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
866                                  errmsg("permission denied for sequence %s",
867                                                 RelationGetRelationName(seqrel))));
868
869         result = last_used_seq->last;
870         relation_close(seqrel, NoLock);
871
872         PG_RETURN_INT64(result);
873 }
874
875 /*
876  * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
877  *
878  * Note that the 3 arg version (which sets the is_called flag) is
879  * only for use in pg_dump, and setting the is_called flag may not
880  * work if multiple users are attached to the database and referencing
881  * the sequence (unlikely if pg_dump is restoring it).
882  *
883  * It is necessary to have the 3 arg version so that pg_dump can
884  * restore the state of a sequence exactly during data-only restores -
885  * it is the only way to clear the is_called flag in an existing
886  * sequence.
887  */
888 static void
889 do_setval(Oid relid, int64 next, bool iscalled)
890 {
891         SeqTable        elm;
892         Relation        seqrel;
893         Buffer          buf;
894         HeapTupleData seqdatatuple;
895         Form_pg_sequence_data seq;
896         HeapTuple       pgstuple;
897         Form_pg_sequence pgsform;
898         int64           maxv,
899                                 minv;
900
901         /* open and lock sequence */
902         init_sequence(relid, &elm, &seqrel);
903
904         if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
905                 ereport(ERROR,
906                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
907                                  errmsg("permission denied for sequence %s",
908                                                 RelationGetRelationName(seqrel))));
909
910         pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(relid));
911         if (!HeapTupleIsValid(pgstuple))
912                 elog(ERROR, "cache lookup failed for sequence %u", relid);
913         pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
914         maxv = pgsform->seqmax;
915         minv = pgsform->seqmin;
916         ReleaseSysCache(pgstuple);
917
918         /* read-only transactions may only modify temp sequences */
919         if (!seqrel->rd_islocaltemp)
920                 PreventCommandIfReadOnly("setval()");
921
922         /*
923          * Forbid this during parallel operation because, to make it work, the
924          * cooperating backends would need to share the backend-local cached
925          * sequence information.  Currently, we don't support that.
926          */
927         PreventCommandIfParallelMode("setval()");
928
929         /* lock page' buffer and read tuple */
930         seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
931
932         if ((next < minv) || (next > maxv))
933         {
934                 char            bufv[100],
935                                         bufm[100],
936                                         bufx[100];
937
938                 snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
939                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, minv);
940                 snprintf(bufx, sizeof(bufx), INT64_FORMAT, maxv);
941                 ereport(ERROR,
942                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
943                                  errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
944                                                 bufv, RelationGetRelationName(seqrel),
945                                                 bufm, bufx)));
946         }
947
948         /* Set the currval() state only if iscalled = true */
949         if (iscalled)
950         {
951                 elm->last = next;               /* last returned number */
952                 elm->last_valid = true;
953         }
954
955         /* In any case, forget any future cached numbers */
956         elm->cached = elm->last;
957
958         /* check the comment above nextval_internal()'s equivalent call. */
959         if (RelationNeedsWAL(seqrel))
960                 GetTopTransactionId();
961
962         /* ready to change the on-disk (or really, in-buffer) tuple */
963         START_CRIT_SECTION();
964
965         seq->last_value = next;         /* last fetched number */
966         seq->is_called = iscalled;
967         seq->log_cnt = 0;
968
969         MarkBufferDirty(buf);
970
971         /* XLOG stuff */
972         if (RelationNeedsWAL(seqrel))
973         {
974                 xl_seq_rec      xlrec;
975                 XLogRecPtr      recptr;
976                 Page            page = BufferGetPage(buf);
977
978                 XLogBeginInsert();
979                 XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
980
981                 xlrec.node = seqrel->rd_node;
982                 XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
983                 XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
984
985                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
986
987                 PageSetLSN(page, recptr);
988         }
989
990         END_CRIT_SECTION();
991
992         UnlockReleaseBuffer(buf);
993
994         relation_close(seqrel, NoLock);
995 }
996
997 /*
998  * Implement the 2 arg setval procedure.
999  * See do_setval for discussion.
1000  */
1001 Datum
1002 setval_oid(PG_FUNCTION_ARGS)
1003 {
1004         Oid                     relid = PG_GETARG_OID(0);
1005         int64           next = PG_GETARG_INT64(1);
1006
1007         do_setval(relid, next, true);
1008
1009         PG_RETURN_INT64(next);
1010 }
1011
1012 /*
1013  * Implement the 3 arg setval procedure.
1014  * See do_setval for discussion.
1015  */
1016 Datum
1017 setval3_oid(PG_FUNCTION_ARGS)
1018 {
1019         Oid                     relid = PG_GETARG_OID(0);
1020         int64           next = PG_GETARG_INT64(1);
1021         bool            iscalled = PG_GETARG_BOOL(2);
1022
1023         do_setval(relid, next, iscalled);
1024
1025         PG_RETURN_INT64(next);
1026 }
1027
1028
1029 /*
1030  * Open the sequence and acquire lock if needed
1031  *
1032  * If we haven't touched the sequence already in this transaction,
1033  * we need to acquire a lock.  We arrange for the lock to
1034  * be owned by the top transaction, so that we don't need to do it
1035  * more than once per xact.
1036  */
1037 static Relation
1038 lock_and_open_sequence(SeqTable seq)
1039 {
1040         LocalTransactionId thislxid = MyProc->lxid;
1041
1042         /* Get the lock if not already held in this xact */
1043         if (seq->lxid != thislxid)
1044         {
1045                 ResourceOwner currentOwner;
1046
1047                 currentOwner = CurrentResourceOwner;
1048                 PG_TRY();
1049                 {
1050                         CurrentResourceOwner = TopTransactionResourceOwner;
1051                         LockRelationOid(seq->relid, RowExclusiveLock);
1052                 }
1053                 PG_CATCH();
1054                 {
1055                         /* Ensure CurrentResourceOwner is restored on error */
1056                         CurrentResourceOwner = currentOwner;
1057                         PG_RE_THROW();
1058                 }
1059                 PG_END_TRY();
1060                 CurrentResourceOwner = currentOwner;
1061
1062                 /* Flag that we have a lock in the current xact */
1063                 seq->lxid = thislxid;
1064         }
1065
1066         /* We now know we have the lock, and can safely open the rel */
1067         return relation_open(seq->relid, NoLock);
1068 }
1069
1070 /*
1071  * Creates the hash table for storing sequence data
1072  */
1073 static void
1074 create_seq_hashtable(void)
1075 {
1076         HASHCTL         ctl;
1077
1078         memset(&ctl, 0, sizeof(ctl));
1079         ctl.keysize = sizeof(Oid);
1080         ctl.entrysize = sizeof(SeqTableData);
1081
1082         seqhashtab = hash_create("Sequence values", 16, &ctl,
1083                                                          HASH_ELEM | HASH_BLOBS);
1084 }
1085
1086 /*
1087  * Given a relation OID, open and lock the sequence.  p_elm and p_rel are
1088  * output parameters.
1089  */
1090 static void
1091 init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
1092 {
1093         SeqTable        elm;
1094         Relation        seqrel;
1095         bool            found;
1096
1097         /* Find or create a hash table entry for this sequence */
1098         if (seqhashtab == NULL)
1099                 create_seq_hashtable();
1100
1101         elm = (SeqTable) hash_search(seqhashtab, &relid, HASH_ENTER, &found);
1102
1103         /*
1104          * Initialize the new hash table entry if it did not exist already.
1105          *
1106          * NOTE: seqtable entries are stored for the life of a backend (unless
1107          * explicitly discarded with DISCARD). If the sequence itself is deleted
1108          * then the entry becomes wasted memory, but it's small enough that this
1109          * should not matter.
1110          */
1111         if (!found)
1112         {
1113                 /* relid already filled in */
1114                 elm->filenode = InvalidOid;
1115                 elm->lxid = InvalidLocalTransactionId;
1116                 elm->last_valid = false;
1117                 elm->last = elm->cached = 0;
1118         }
1119
1120         /*
1121          * Open the sequence relation.
1122          */
1123         seqrel = lock_and_open_sequence(elm);
1124
1125         if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
1126                 ereport(ERROR,
1127                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1128                                  errmsg("\"%s\" is not a sequence",
1129                                                 RelationGetRelationName(seqrel))));
1130
1131         /*
1132          * If the sequence has been transactionally replaced since we last saw it,
1133          * discard any cached-but-unissued values.  We do not touch the currval()
1134          * state, however.
1135          */
1136         if (seqrel->rd_rel->relfilenode != elm->filenode)
1137         {
1138                 elm->filenode = seqrel->rd_rel->relfilenode;
1139                 elm->cached = elm->last;
1140         }
1141
1142         /* Return results */
1143         *p_elm = elm;
1144         *p_rel = seqrel;
1145 }
1146
1147
1148 /*
1149  * Given an opened sequence relation, lock the page buffer and find the tuple
1150  *
1151  * *buf receives the reference to the pinned-and-ex-locked buffer
1152  * *seqdatatuple receives the reference to the sequence tuple proper
1153  *              (this arg should point to a local variable of type HeapTupleData)
1154  *
1155  * Function's return value points to the data payload of the tuple
1156  */
1157 static Form_pg_sequence_data
1158 read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
1159 {
1160         Page            page;
1161         ItemId          lp;
1162         sequence_magic *sm;
1163         Form_pg_sequence_data seq;
1164
1165         *buf = ReadBuffer(rel, 0);
1166         LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
1167
1168         page = BufferGetPage(*buf);
1169         sm = (sequence_magic *) PageGetSpecialPointer(page);
1170
1171         if (sm->magic != SEQ_MAGIC)
1172                 elog(ERROR, "bad magic number in sequence \"%s\": %08X",
1173                          RelationGetRelationName(rel), sm->magic);
1174
1175         lp = PageGetItemId(page, FirstOffsetNumber);
1176         Assert(ItemIdIsNormal(lp));
1177
1178         /* Note we currently only bother to set these two fields of *seqdatatuple */
1179         seqdatatuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
1180         seqdatatuple->t_len = ItemIdGetLength(lp);
1181
1182         /*
1183          * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on
1184          * a sequence, which would leave a non-frozen XID in the sequence tuple's
1185          * xmax, which eventually leads to clog access failures or worse. If we
1186          * see this has happened, clean up after it.  We treat this like a hint
1187          * bit update, ie, don't bother to WAL-log it, since we can certainly do
1188          * this again if the update gets lost.
1189          */
1190         Assert(!(seqdatatuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
1191         if (HeapTupleHeaderGetRawXmax(seqdatatuple->t_data) != InvalidTransactionId)
1192         {
1193                 HeapTupleHeaderSetXmax(seqdatatuple->t_data, InvalidTransactionId);
1194                 seqdatatuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
1195                 seqdatatuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1196                 MarkBufferDirtyHint(*buf, true);
1197         }
1198
1199         seq = (Form_pg_sequence_data) GETSTRUCT(seqdatatuple);
1200
1201         return seq;
1202 }
1203
1204 /*
1205  * init_params: process the options list of CREATE or ALTER SEQUENCE, and
1206  * store the values into appropriate fields of seqform, for changes that go
1207  * into the pg_sequence catalog, and seqdataform for changes to the sequence
1208  * relation itself.  Set *changed_seqform to true if seqform was changed
1209  * (interesting for ALTER SEQUENCE).  Also set *owned_by to any OWNED BY
1210  * option, or to NIL if there is none.
1211  *
1212  * If isInit is true, fill any unspecified options with default values;
1213  * otherwise, do not change existing options that aren't explicitly overridden.
1214  */
1215 static void
1216 init_params(ParseState *pstate, List *options, bool for_identity,
1217                         bool isInit,
1218                         Form_pg_sequence seqform,
1219                         Form_pg_sequence_data seqdataform,
1220                         List **owned_by)
1221 {
1222         DefElem    *as_type = NULL;
1223         DefElem    *start_value = NULL;
1224         DefElem    *restart_value = NULL;
1225         DefElem    *increment_by = NULL;
1226         DefElem    *max_value = NULL;
1227         DefElem    *min_value = NULL;
1228         DefElem    *cache_value = NULL;
1229         DefElem    *is_cycled = NULL;
1230         ListCell   *option;
1231         bool            reset_max_value = false;
1232         bool            reset_min_value = false;
1233
1234         *owned_by = NIL;
1235
1236         foreach(option, options)
1237         {
1238                 DefElem    *defel = (DefElem *) lfirst(option);
1239
1240                 if (strcmp(defel->defname, "as") == 0)
1241                 {
1242                         if (as_type)
1243                                 ereport(ERROR,
1244                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1245                                                  errmsg("conflicting or redundant options"),
1246                                                  parser_errposition(pstate, defel->location)));
1247                         as_type = defel;
1248                 }
1249                 else if (strcmp(defel->defname, "increment") == 0)
1250                 {
1251                         if (increment_by)
1252                                 ereport(ERROR,
1253                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1254                                                  errmsg("conflicting or redundant options"),
1255                                                  parser_errposition(pstate, defel->location)));
1256                         increment_by = defel;
1257                 }
1258                 else if (strcmp(defel->defname, "start") == 0)
1259                 {
1260                         if (start_value)
1261                                 ereport(ERROR,
1262                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1263                                                  errmsg("conflicting or redundant options"),
1264                                                  parser_errposition(pstate, defel->location)));
1265                         start_value = defel;
1266                 }
1267                 else if (strcmp(defel->defname, "restart") == 0)
1268                 {
1269                         if (restart_value)
1270                                 ereport(ERROR,
1271                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1272                                                  errmsg("conflicting or redundant options"),
1273                                                  parser_errposition(pstate, defel->location)));
1274                         restart_value = defel;
1275                 }
1276                 else if (strcmp(defel->defname, "maxvalue") == 0)
1277                 {
1278                         if (max_value)
1279                                 ereport(ERROR,
1280                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1281                                                  errmsg("conflicting or redundant options"),
1282                                                  parser_errposition(pstate, defel->location)));
1283                         max_value = defel;
1284                 }
1285                 else if (strcmp(defel->defname, "minvalue") == 0)
1286                 {
1287                         if (min_value)
1288                                 ereport(ERROR,
1289                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1290                                                  errmsg("conflicting or redundant options"),
1291                                                  parser_errposition(pstate, defel->location)));
1292                         min_value = defel;
1293                 }
1294                 else if (strcmp(defel->defname, "cache") == 0)
1295                 {
1296                         if (cache_value)
1297                                 ereport(ERROR,
1298                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1299                                                  errmsg("conflicting or redundant options"),
1300                                                  parser_errposition(pstate, defel->location)));
1301                         cache_value = defel;
1302                 }
1303                 else if (strcmp(defel->defname, "cycle") == 0)
1304                 {
1305                         if (is_cycled)
1306                                 ereport(ERROR,
1307                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1308                                                  errmsg("conflicting or redundant options"),
1309                                                  parser_errposition(pstate, defel->location)));
1310                         is_cycled = defel;
1311                 }
1312                 else if (strcmp(defel->defname, "owned_by") == 0)
1313                 {
1314                         if (*owned_by)
1315                                 ereport(ERROR,
1316                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1317                                                  errmsg("conflicting or redundant options"),
1318                                                  parser_errposition(pstate, defel->location)));
1319                         *owned_by = defGetQualifiedName(defel);
1320                 }
1321                 else if (strcmp(defel->defname, "sequence_name") == 0)
1322                 {
1323                         /*
1324                          * The parser allows this, but it is only for identity columns, in
1325                          * which case it is filtered out in parse_utilcmd.c.  We only get
1326                          * here if someone puts it into a CREATE SEQUENCE.
1327                          */
1328                         ereport(ERROR,
1329                                         (errcode(ERRCODE_SYNTAX_ERROR),
1330                                          errmsg("invalid sequence option SEQUENCE NAME"),
1331                                          parser_errposition(pstate, defel->location)));
1332                 }
1333                 else
1334                         elog(ERROR, "option \"%s\" not recognized",
1335                                  defel->defname);
1336         }
1337
1338         /*
1339          * We must reset log_cnt when isInit or when changing any parameters that
1340          * would affect future nextval allocations.
1341          */
1342         if (isInit)
1343                 seqdataform->log_cnt = 0;
1344
1345         /* AS type */
1346         if (as_type != NULL)
1347         {
1348                 Oid                     newtypid = typenameTypeId(pstate, defGetTypeName(as_type));
1349
1350                 if (newtypid != INT2OID &&
1351                         newtypid != INT4OID &&
1352                         newtypid != INT8OID)
1353                         ereport(ERROR,
1354                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1355                                          for_identity
1356                                          ? errmsg("identity column type must be smallint, integer, or bigint")
1357                         : errmsg("sequence type must be smallint, integer, or bigint")));
1358
1359                 if (!isInit)
1360                 {
1361                         /*
1362                          * When changing type and the old sequence min/max values were the
1363                          * min/max of the old type, adjust sequence min/max values to
1364                          * min/max of new type.  (Otherwise, the user chose explicit
1365                          * min/max values, which we'll leave alone.)
1366                          */
1367                         if ((seqform->seqtypid == INT2OID && seqform->seqmax == PG_INT16_MAX) ||
1368                                 (seqform->seqtypid == INT4OID && seqform->seqmax == PG_INT32_MAX) ||
1369                         (seqform->seqtypid == INT8OID && seqform->seqmax == PG_INT64_MAX))
1370                                 reset_max_value = true;
1371                         if ((seqform->seqtypid == INT2OID && seqform->seqmin == PG_INT16_MIN) ||
1372                                 (seqform->seqtypid == INT4OID && seqform->seqmin == PG_INT32_MIN) ||
1373                         (seqform->seqtypid == INT8OID && seqform->seqmin == PG_INT64_MIN))
1374                                 reset_min_value = true;
1375                 }
1376
1377                 seqform->seqtypid = newtypid;
1378         }
1379         else if (isInit)
1380         {
1381                 seqform->seqtypid = INT8OID;
1382         }
1383
1384         /* INCREMENT BY */
1385         if (increment_by != NULL)
1386         {
1387                 seqform->seqincrement = defGetInt64(increment_by);
1388                 if (seqform->seqincrement == 0)
1389                         ereport(ERROR,
1390                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1391                                          errmsg("INCREMENT must not be zero")));
1392                 seqdataform->log_cnt = 0;
1393         }
1394         else if (isInit)
1395         {
1396                 seqform->seqincrement = 1;
1397         }
1398
1399         /* CYCLE */
1400         if (is_cycled != NULL)
1401         {
1402                 seqform->seqcycle = intVal(is_cycled->arg);
1403                 Assert(BoolIsValid(seqform->seqcycle));
1404                 seqdataform->log_cnt = 0;
1405         }
1406         else if (isInit)
1407         {
1408                 seqform->seqcycle = false;
1409         }
1410
1411         /* MAXVALUE (null arg means NO MAXVALUE) */
1412         if (max_value != NULL && max_value->arg)
1413         {
1414                 seqform->seqmax = defGetInt64(max_value);
1415                 seqdataform->log_cnt = 0;
1416         }
1417         else if (isInit || max_value != NULL || reset_max_value)
1418         {
1419                 if (seqform->seqincrement > 0 || reset_max_value)
1420                 {
1421                         /* ascending seq */
1422                         if (seqform->seqtypid == INT2OID)
1423                                 seqform->seqmax = PG_INT16_MAX;
1424                         else if (seqform->seqtypid == INT4OID)
1425                                 seqform->seqmax = PG_INT32_MAX;
1426                         else
1427                                 seqform->seqmax = PG_INT64_MAX;
1428                 }
1429                 else
1430                         seqform->seqmax = -1;           /* descending seq */
1431                 seqdataform->log_cnt = 0;
1432         }
1433
1434         if ((seqform->seqtypid == INT2OID && (seqform->seqmax < PG_INT16_MIN || seqform->seqmax > PG_INT16_MAX))
1435                 || (seqform->seqtypid == INT4OID && (seqform->seqmax < PG_INT32_MIN || seqform->seqmax > PG_INT32_MAX))
1436                 || (seqform->seqtypid == INT8OID && (seqform->seqmax < PG_INT64_MIN || seqform->seqmax > PG_INT64_MAX)))
1437         {
1438                 char            bufx[100];
1439
1440                 snprintf(bufx, sizeof(bufx), INT64_FORMAT, seqform->seqmax);
1441
1442                 ereport(ERROR,
1443                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1444                         errmsg("MAXVALUE (%s) is out of range for sequence data type %s",
1445                                    bufx, format_type_be(seqform->seqtypid))));
1446         }
1447
1448         /* MINVALUE (null arg means NO MINVALUE) */
1449         if (min_value != NULL && min_value->arg)
1450         {
1451                 seqform->seqmin = defGetInt64(min_value);
1452                 seqdataform->log_cnt = 0;
1453         }
1454         else if (isInit || min_value != NULL || reset_min_value)
1455         {
1456                 if (seqform->seqincrement < 0 || reset_min_value)
1457                 {
1458                         /* descending seq */
1459                         if (seqform->seqtypid == INT2OID)
1460                                 seqform->seqmin = PG_INT16_MIN;
1461                         else if (seqform->seqtypid == INT4OID)
1462                                 seqform->seqmin = PG_INT32_MIN;
1463                         else
1464                                 seqform->seqmin = PG_INT64_MIN;
1465                 }
1466                 else
1467                         seqform->seqmin = 1;    /* ascending seq */
1468                 seqdataform->log_cnt = 0;
1469         }
1470
1471         if ((seqform->seqtypid == INT2OID && (seqform->seqmin < PG_INT16_MIN || seqform->seqmin > PG_INT16_MAX))
1472                 || (seqform->seqtypid == INT4OID && (seqform->seqmin < PG_INT32_MIN || seqform->seqmin > PG_INT32_MAX))
1473                 || (seqform->seqtypid == INT8OID && (seqform->seqmin < PG_INT64_MIN || seqform->seqmin > PG_INT64_MAX)))
1474         {
1475                 char            bufm[100];
1476
1477                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmin);
1478
1479                 ereport(ERROR,
1480                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1481                         errmsg("MINVALUE (%s) is out of range for sequence data type %s",
1482                                    bufm, format_type_be(seqform->seqtypid))));
1483         }
1484
1485         /* crosscheck min/max */
1486         if (seqform->seqmin >= seqform->seqmax)
1487         {
1488                 char            bufm[100],
1489                                         bufx[100];
1490
1491                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmin);
1492                 snprintf(bufx, sizeof(bufx), INT64_FORMAT, seqform->seqmax);
1493                 ereport(ERROR,
1494                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1495                                  errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
1496                                                 bufm, bufx)));
1497         }
1498
1499         /* START WITH */
1500         if (start_value != NULL)
1501         {
1502                 seqform->seqstart = defGetInt64(start_value);
1503         }
1504         else if (isInit)
1505         {
1506                 if (seqform->seqincrement > 0)
1507                         seqform->seqstart = seqform->seqmin;            /* ascending seq */
1508                 else
1509                         seqform->seqstart = seqform->seqmax;            /* descending seq */
1510         }
1511
1512         /* crosscheck START */
1513         if (seqform->seqstart < seqform->seqmin)
1514         {
1515                 char            bufs[100],
1516                                         bufm[100];
1517
1518                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, seqform->seqstart);
1519                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmin);
1520                 ereport(ERROR,
1521                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1522                                  errmsg("START value (%s) cannot be less than MINVALUE (%s)",
1523                                                 bufs, bufm)));
1524         }
1525         if (seqform->seqstart > seqform->seqmax)
1526         {
1527                 char            bufs[100],
1528                                         bufm[100];
1529
1530                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, seqform->seqstart);
1531                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmax);
1532                 ereport(ERROR,
1533                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1534                           errmsg("START value (%s) cannot be greater than MAXVALUE (%s)",
1535                                          bufs, bufm)));
1536         }
1537
1538         /* RESTART [WITH] */
1539         if (restart_value != NULL)
1540         {
1541                 if (restart_value->arg != NULL)
1542                         seqdataform->last_value = defGetInt64(restart_value);
1543                 else
1544                         seqdataform->last_value = seqform->seqstart;
1545                 seqdataform->is_called = false;
1546                 seqdataform->log_cnt = 0;
1547         }
1548         else if (isInit)
1549         {
1550                 seqdataform->last_value = seqform->seqstart;
1551                 seqdataform->is_called = false;
1552         }
1553
1554         /* crosscheck RESTART (or current value, if changing MIN/MAX) */
1555         if (seqdataform->last_value < seqform->seqmin)
1556         {
1557                 char            bufs[100],
1558                                         bufm[100];
1559
1560                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, seqdataform->last_value);
1561                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmin);
1562                 ereport(ERROR,
1563                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1564                            errmsg("RESTART value (%s) cannot be less than MINVALUE (%s)",
1565                                           bufs, bufm)));
1566         }
1567         if (seqdataform->last_value > seqform->seqmax)
1568         {
1569                 char            bufs[100],
1570                                         bufm[100];
1571
1572                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, seqdataform->last_value);
1573                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seqform->seqmax);
1574                 ereport(ERROR,
1575                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1576                         errmsg("RESTART value (%s) cannot be greater than MAXVALUE (%s)",
1577                                    bufs, bufm)));
1578         }
1579
1580         /* CACHE */
1581         if (cache_value != NULL)
1582         {
1583                 seqform->seqcache = defGetInt64(cache_value);
1584                 if (seqform->seqcache <= 0)
1585                 {
1586                         char            buf[100];
1587
1588                         snprintf(buf, sizeof(buf), INT64_FORMAT, seqform->seqcache);
1589                         ereport(ERROR,
1590                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1591                                          errmsg("CACHE (%s) must be greater than zero",
1592                                                         buf)));
1593                 }
1594                 seqdataform->log_cnt = 0;
1595         }
1596         else if (isInit)
1597         {
1598                 seqform->seqcache = 1;
1599         }
1600 }
1601
1602 /*
1603  * Process an OWNED BY option for CREATE/ALTER SEQUENCE
1604  *
1605  * Ownership permissions on the sequence are already checked,
1606  * but if we are establishing a new owned-by dependency, we must
1607  * enforce that the referenced table has the same owner and namespace
1608  * as the sequence.
1609  */
1610 static void
1611 process_owned_by(Relation seqrel, List *owned_by, bool for_identity)
1612 {
1613         DependencyType deptype;
1614         int                     nnames;
1615         Relation        tablerel;
1616         AttrNumber      attnum;
1617
1618         deptype = for_identity ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO;
1619
1620         nnames = list_length(owned_by);
1621         Assert(nnames > 0);
1622         if (nnames == 1)
1623         {
1624                 /* Must be OWNED BY NONE */
1625                 if (strcmp(strVal(linitial(owned_by)), "none") != 0)
1626                         ereport(ERROR,
1627                                         (errcode(ERRCODE_SYNTAX_ERROR),
1628                                          errmsg("invalid OWNED BY option"),
1629                                 errhint("Specify OWNED BY table.column or OWNED BY NONE.")));
1630                 tablerel = NULL;
1631                 attnum = 0;
1632         }
1633         else
1634         {
1635                 List       *relname;
1636                 char       *attrname;
1637                 RangeVar   *rel;
1638
1639                 /* Separate relname and attr name */
1640                 relname = list_truncate(list_copy(owned_by), nnames - 1);
1641                 attrname = strVal(lfirst(list_tail(owned_by)));
1642
1643                 /* Open and lock rel to ensure it won't go away meanwhile */
1644                 rel = makeRangeVarFromNameList(relname);
1645                 tablerel = relation_openrv(rel, AccessShareLock);
1646
1647                 /* Must be a regular or foreign table */
1648                 if (!(tablerel->rd_rel->relkind == RELKIND_RELATION ||
1649                           tablerel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
1650                           tablerel->rd_rel->relkind == RELKIND_VIEW ||
1651                           tablerel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE))
1652                         ereport(ERROR,
1653                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1654                                          errmsg("referenced relation \"%s\" is not a table or foreign table",
1655                                                         RelationGetRelationName(tablerel))));
1656
1657                 /* We insist on same owner and schema */
1658                 if (seqrel->rd_rel->relowner != tablerel->rd_rel->relowner)
1659                         ereport(ERROR,
1660                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1661                                          errmsg("sequence must have same owner as table it is linked to")));
1662                 if (RelationGetNamespace(seqrel) != RelationGetNamespace(tablerel))
1663                         ereport(ERROR,
1664                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1665                                          errmsg("sequence must be in same schema as table it is linked to")));
1666
1667                 /* Now, fetch the attribute number from the system cache */
1668                 attnum = get_attnum(RelationGetRelid(tablerel), attrname);
1669                 if (attnum == InvalidAttrNumber)
1670                         ereport(ERROR,
1671                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
1672                                          errmsg("column \"%s\" of relation \"%s\" does not exist",
1673                                                         attrname, RelationGetRelationName(tablerel))));
1674         }
1675
1676         /*
1677          * Catch user explicitly running OWNED BY on identity sequence.
1678          */
1679         if (deptype == DEPENDENCY_AUTO)
1680         {
1681                 Oid                     tableId;
1682                 int32           colId;
1683
1684                 if (sequenceIsOwned(RelationGetRelid(seqrel), DEPENDENCY_INTERNAL, &tableId, &colId))
1685                         ereport(ERROR,
1686                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1687                                          errmsg("cannot change ownership of identity sequence"),
1688                                          errdetail("Sequence \"%s\" is linked to table \"%s\".",
1689                                                            RelationGetRelationName(seqrel),
1690                                                            get_rel_name(tableId))));
1691         }
1692
1693         /*
1694          * OK, we are ready to update pg_depend.  First remove any existing
1695          * dependencies for the sequence, then optionally add a new one.
1696          */
1697         deleteDependencyRecordsForClass(RelationRelationId, RelationGetRelid(seqrel),
1698                                                                         RelationRelationId, deptype);
1699
1700         if (tablerel)
1701         {
1702                 ObjectAddress refobject,
1703                                         depobject;
1704
1705                 refobject.classId = RelationRelationId;
1706                 refobject.objectId = RelationGetRelid(tablerel);
1707                 refobject.objectSubId = attnum;
1708                 depobject.classId = RelationRelationId;
1709                 depobject.objectId = RelationGetRelid(seqrel);
1710                 depobject.objectSubId = 0;
1711                 recordDependencyOn(&depobject, &refobject, deptype);
1712         }
1713
1714         /* Done, but hold lock until commit */
1715         if (tablerel)
1716                 relation_close(tablerel, NoLock);
1717 }
1718
1719
1720 /*
1721  * Return sequence parameters in a list of the form created by the parser.
1722  */
1723 List *
1724 sequence_options(Oid relid)
1725 {
1726         HeapTuple       pgstuple;
1727         Form_pg_sequence pgsform;
1728         List       *options = NIL;
1729
1730         pgstuple = SearchSysCache1(SEQRELID, relid);
1731         if (!HeapTupleIsValid(pgstuple))
1732                 elog(ERROR, "cache lookup failed for sequence %u", relid);
1733         pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
1734
1735         options = lappend(options, makeDefElem("cache", (Node *) makeInteger(pgsform->seqcache), -1));
1736         options = lappend(options, makeDefElem("cycle", (Node *) makeInteger(pgsform->seqcycle), -1));
1737         options = lappend(options, makeDefElem("increment", (Node *) makeInteger(pgsform->seqincrement), -1));
1738         options = lappend(options, makeDefElem("maxvalue", (Node *) makeInteger(pgsform->seqmax), -1));
1739         options = lappend(options, makeDefElem("minvalue", (Node *) makeInteger(pgsform->seqmin), -1));
1740         options = lappend(options, makeDefElem("start", (Node *) makeInteger(pgsform->seqstart), -1));
1741
1742         ReleaseSysCache(pgstuple);
1743
1744         return options;
1745 }
1746
1747 /*
1748  * Return sequence parameters (formerly for use by information schema)
1749  */
1750 Datum
1751 pg_sequence_parameters(PG_FUNCTION_ARGS)
1752 {
1753         Oid                     relid = PG_GETARG_OID(0);
1754         TupleDesc       tupdesc;
1755         Datum           values[7];
1756         bool            isnull[7];
1757         HeapTuple       pgstuple;
1758         Form_pg_sequence pgsform;
1759
1760         if (pg_class_aclcheck(relid, GetUserId(), ACL_SELECT | ACL_UPDATE | ACL_USAGE) != ACLCHECK_OK)
1761                 ereport(ERROR,
1762                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1763                                  errmsg("permission denied for sequence %s",
1764                                                 get_rel_name(relid))));
1765
1766         tupdesc = CreateTemplateTupleDesc(7, false);
1767         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "start_value",
1768                                            INT8OID, -1, 0);
1769         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "minimum_value",
1770                                            INT8OID, -1, 0);
1771         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "maximum_value",
1772                                            INT8OID, -1, 0);
1773         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "increment",
1774                                            INT8OID, -1, 0);
1775         TupleDescInitEntry(tupdesc, (AttrNumber) 5, "cycle_option",
1776                                            BOOLOID, -1, 0);
1777         TupleDescInitEntry(tupdesc, (AttrNumber) 6, "cache_size",
1778                                            INT8OID, -1, 0);
1779         TupleDescInitEntry(tupdesc, (AttrNumber) 7, "data_type",
1780                                            OIDOID, -1, 0);
1781
1782         BlessTupleDesc(tupdesc);
1783
1784         memset(isnull, 0, sizeof(isnull));
1785
1786         pgstuple = SearchSysCache1(SEQRELID, relid);
1787         if (!HeapTupleIsValid(pgstuple))
1788                 elog(ERROR, "cache lookup failed for sequence %u", relid);
1789         pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
1790
1791         values[0] = Int64GetDatum(pgsform->seqstart);
1792         values[1] = Int64GetDatum(pgsform->seqmin);
1793         values[2] = Int64GetDatum(pgsform->seqmax);
1794         values[3] = Int64GetDatum(pgsform->seqincrement);
1795         values[4] = BoolGetDatum(pgsform->seqcycle);
1796         values[5] = Int64GetDatum(pgsform->seqcache);
1797         values[6] = ObjectIdGetDatum(pgsform->seqtypid);
1798
1799         ReleaseSysCache(pgstuple);
1800
1801         return HeapTupleGetDatum(heap_form_tuple(tupdesc, values, isnull));
1802 }
1803
1804 /*
1805  * Return the last value from the sequence
1806  *
1807  * Note: This has a completely different meaning than lastval().
1808  */
1809 Datum
1810 pg_sequence_last_value(PG_FUNCTION_ARGS)
1811 {
1812         Oid                     relid = PG_GETARG_OID(0);
1813         SeqTable        elm;
1814         Relation        seqrel;
1815         Buffer          buf;
1816         HeapTupleData seqtuple;
1817         Form_pg_sequence_data seq;
1818         bool            is_called;
1819         int64           result;
1820
1821         /* open and lock sequence */
1822         init_sequence(relid, &elm, &seqrel);
1823
1824         if (pg_class_aclcheck(relid, GetUserId(), ACL_SELECT | ACL_USAGE) != ACLCHECK_OK)
1825                 ereport(ERROR,
1826                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1827                                  errmsg("permission denied for sequence %s",
1828                                                 RelationGetRelationName(seqrel))));
1829
1830         seq = read_seq_tuple(seqrel, &buf, &seqtuple);
1831
1832         is_called = seq->is_called;
1833         result = seq->last_value;
1834
1835         UnlockReleaseBuffer(buf);
1836         relation_close(seqrel, NoLock);
1837
1838         if (is_called)
1839                 PG_RETURN_INT64(result);
1840         else
1841                 PG_RETURN_NULL();
1842 }
1843
1844
1845 void
1846 seq_redo(XLogReaderState *record)
1847 {
1848         XLogRecPtr      lsn = record->EndRecPtr;
1849         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1850         Buffer          buffer;
1851         Page            page;
1852         Page            localpage;
1853         char       *item;
1854         Size            itemsz;
1855         xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1856         sequence_magic *sm;
1857
1858         if (info != XLOG_SEQ_LOG)
1859                 elog(PANIC, "seq_redo: unknown op code %u", info);
1860
1861         buffer = XLogInitBufferForRedo(record, 0);
1862         page = (Page) BufferGetPage(buffer);
1863
1864         /*
1865          * We always reinit the page.  However, since this WAL record type is also
1866          * used for updating sequences, it's possible that a hot-standby backend
1867          * is examining the page concurrently; so we mustn't transiently trash the
1868          * buffer.  The solution is to build the correct new page contents in
1869          * local workspace and then memcpy into the buffer.  Then only bytes that
1870          * are supposed to change will change, even transiently. We must palloc
1871          * the local page for alignment reasons.
1872          */
1873         localpage = (Page) palloc(BufferGetPageSize(buffer));
1874
1875         PageInit(localpage, BufferGetPageSize(buffer), sizeof(sequence_magic));
1876         sm = (sequence_magic *) PageGetSpecialPointer(localpage);
1877         sm->magic = SEQ_MAGIC;
1878
1879         item = (char *) xlrec + sizeof(xl_seq_rec);
1880         itemsz = XLogRecGetDataLen(record) - sizeof(xl_seq_rec);
1881
1882         if (PageAddItem(localpage, (Item) item, itemsz,
1883                                         FirstOffsetNumber, false, false) == InvalidOffsetNumber)
1884                 elog(PANIC, "seq_redo: failed to add item to page");
1885
1886         PageSetLSN(localpage, lsn);
1887
1888         memcpy(page, localpage, BufferGetPageSize(buffer));
1889         MarkBufferDirty(buffer);
1890         UnlockReleaseBuffer(buffer);
1891
1892         pfree(localpage);
1893 }
1894
1895 /*
1896  * Flush cached sequence information.
1897  */
1898 void
1899 ResetSequenceCaches(void)
1900 {
1901         if (seqhashtab)
1902         {
1903                 hash_destroy(seqhashtab);
1904                 seqhashtab = NULL;
1905         }
1906
1907         last_used_seq = NULL;
1908 }
1909
1910 /*
1911  * Mask a Sequence page before performing consistency checks on it.
1912  */
1913 void
1914 seq_mask(char *page, BlockNumber blkno)
1915 {
1916         mask_page_lsn(page);
1917
1918         mask_unused_space(page);
1919 }