]> granicus.if.org Git - postgresql/blob - src/backend/commands/sequence.c
Create a 'type cache' that keeps track of the data needed for any particular
[postgresql] / src / backend / commands / sequence.c
1 /*-------------------------------------------------------------------------
2  *
3  * sequence.c
4  *        PostgreSQL sequences support code.
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.102 2003/08/08 21:41:32 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "catalog/namespace.h"
19 #include "catalog/pg_type.h"
20 #include "commands/defrem.h"
21 #include "commands/tablecmds.h"
22 #include "commands/sequence.h"
23 #include "miscadmin.h"
24 #include "utils/acl.h"
25 #include "utils/builtins.h"
26
27 /*
28  * We don't want to log each fetching of a value from a sequence,
29  * so we pre-log a few fetches in advance. In the event of
30  * crash we can lose as much as we pre-logged.
31  */
32 #define SEQ_LOG_VALS    32
33
34 /*
35  * The "special area" of a sequence's buffer page looks like this.
36  */
37 #define SEQ_MAGIC         0x1717
38
39 typedef struct sequence_magic
40 {
41         uint32          magic;
42 } sequence_magic;
43
44 /*
45  * We store a SeqTable item for every sequence we have touched in the current
46  * session.  This is needed to hold onto nextval/currval state.  (We can't
47  * rely on the relcache, since it's only, well, a cache, and may decide to
48  * discard entries.)
49  *
50  * XXX We use linear search to find pre-existing SeqTable entries.      This is
51  * good when only a small number of sequences are touched in a session, but
52  * would suck with many different sequences.  Perhaps use a hashtable someday.
53  */
54 typedef struct SeqTableData
55 {
56         struct SeqTableData *next;      /* link to next SeqTable object */
57         Oid                     relid;                  /* pg_class OID of this sequence */
58         TransactionId xid;                      /* xact in which we last did a seq op */
59         int64           last;                   /* value last returned by nextval */
60         int64           cached;                 /* last value already cached for nextval */
61         /* if last != cached, we have not used up all the cached values */
62         int64           increment;              /* copy of sequence's increment field */
63 } SeqTableData;
64
65 typedef SeqTableData *SeqTable;
66
67 static SeqTable seqtab = NULL;  /* Head of list of SeqTable items */
68
69
70 static void init_sequence(RangeVar *relation,
71                           SeqTable *p_elm, Relation *p_rel);
72 static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);
73 static void init_params(List *options, Form_pg_sequence new);
74 static void do_setval(RangeVar *sequence, int64 next, bool iscalled);
75
76 /*
77  * DefineSequence
78  *                              Creates a new sequence relation
79  */
80 void
81 DefineSequence(CreateSeqStmt *seq)
82 {
83         FormData_pg_sequence new;
84         CreateStmt *stmt = makeNode(CreateStmt);
85         Oid                     seqoid;
86         Relation        rel;
87         Buffer          buf;
88         PageHeader      page;
89         sequence_magic *sm;
90         HeapTuple       tuple;
91         TupleDesc       tupDesc;
92         Datum           value[SEQ_COL_LASTCOL];
93         char            null[SEQ_COL_LASTCOL];
94         int                     i;
95         NameData        name;
96
97         /* Values are NULL (or false) by default */
98         new.last_value = 0;
99         new.increment_by = 0;
100         new.max_value = 0;
101         new.min_value = 0;
102         new.cache_value = 0;
103         new.is_cycled = false;
104
105         /* Check and set values */
106         init_params(seq->options, &new);
107
108         /*
109          * Create relation (and fill *null & *value)
110          */
111         stmt->tableElts = NIL;
112         for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
113         {
114                 ColumnDef  *coldef;
115                 TypeName   *typnam;
116
117                 typnam = makeNode(TypeName);
118                 typnam->setof = FALSE;
119                 typnam->arrayBounds = NIL;
120                 typnam->typmod = -1;
121
122                 coldef = makeNode(ColumnDef);
123                 coldef->typename = typnam;
124                 coldef->inhcount = 0;
125                 coldef->is_local = true;
126                 coldef->is_not_null = true;
127                 coldef->raw_default = NULL;
128                 coldef->cooked_default = NULL;
129                 coldef->constraints = NIL;
130                 coldef->support = NULL;
131
132                 null[i - 1] = ' ';
133
134                 switch (i)
135                 {
136                         case SEQ_COL_NAME:
137                                 typnam->typeid = NAMEOID;
138                                 coldef->colname = "sequence_name";
139                                 namestrcpy(&name, seq->sequence->relname);
140                                 value[i - 1] = NameGetDatum(&name);
141                                 break;
142                         case SEQ_COL_LASTVAL:
143                                 typnam->typeid = INT8OID;
144                                 coldef->colname = "last_value";
145                                 value[i - 1] = Int64GetDatumFast(new.last_value);
146                                 break;
147                         case SEQ_COL_INCBY:
148                                 typnam->typeid = INT8OID;
149                                 coldef->colname = "increment_by";
150                                 value[i - 1] = Int64GetDatumFast(new.increment_by);
151                                 break;
152                         case SEQ_COL_MAXVALUE:
153                                 typnam->typeid = INT8OID;
154                                 coldef->colname = "max_value";
155                                 value[i - 1] = Int64GetDatumFast(new.max_value);
156                                 break;
157                         case SEQ_COL_MINVALUE:
158                                 typnam->typeid = INT8OID;
159                                 coldef->colname = "min_value";
160                                 value[i - 1] = Int64GetDatumFast(new.min_value);
161                                 break;
162                         case SEQ_COL_CACHE:
163                                 typnam->typeid = INT8OID;
164                                 coldef->colname = "cache_value";
165                                 value[i - 1] = Int64GetDatumFast(new.cache_value);
166                                 break;
167                         case SEQ_COL_LOG:
168                                 typnam->typeid = INT8OID;
169                                 coldef->colname = "log_cnt";
170                                 value[i - 1] = Int64GetDatum((int64) 1);
171                                 break;
172                         case SEQ_COL_CYCLE:
173                                 typnam->typeid = BOOLOID;
174                                 coldef->colname = "is_cycled";
175                                 value[i - 1] = BoolGetDatum(new.is_cycled);
176                                 break;
177                         case SEQ_COL_CALLED:
178                                 typnam->typeid = BOOLOID;
179                                 coldef->colname = "is_called";
180                                 value[i - 1] = BoolGetDatum(false);
181                                 break;
182                 }
183                 stmt->tableElts = lappend(stmt->tableElts, coldef);
184         }
185
186         stmt->relation = seq->sequence;
187         stmt->inhRelations = NIL;
188         stmt->constraints = NIL;
189         stmt->hasoids = false;
190         stmt->oncommit = ONCOMMIT_NOOP;
191
192         seqoid = DefineRelation(stmt, RELKIND_SEQUENCE);
193
194         rel = heap_open(seqoid, AccessExclusiveLock);
195         tupDesc = RelationGetDescr(rel);
196
197         /* Initialize first page of relation with special magic number */
198
199         buf = ReadBuffer(rel, P_NEW);
200
201         if (!BufferIsValid(buf))
202                 elog(ERROR, "ReadBuffer failed");
203
204         Assert(BufferGetBlockNumber(buf) == 0);
205
206         page = (PageHeader) BufferGetPage(buf);
207
208         PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
209         sm = (sequence_magic *) PageGetSpecialPointer(page);
210         sm->magic = SEQ_MAGIC;
211
212         /* hack: ensure heap_insert will insert on the just-created page */
213         rel->rd_targblock = 0;
214
215         /* Now form & insert sequence tuple */
216         tuple = heap_formtuple(tupDesc, value, null);
217         simple_heap_insert(rel, tuple);
218
219         Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);
220
221         /*
222          * Two special hacks here:
223          *
224          * 1. Since VACUUM does not process sequences, we have to force the tuple
225          * to have xmin = FrozenTransactionId now.      Otherwise it would become
226          * invisible to SELECTs after 2G transactions.  It is okay to do this
227          * because if the current transaction aborts, no other xact will ever
228          * examine the sequence tuple anyway.
229          *
230          * 2. Even though heap_insert emitted a WAL log record, we have to emit
231          * an XLOG_SEQ_LOG record too, since (a) the heap_insert record will
232          * not have the right xmin, and (b) REDO of the heap_insert record
233          * would re-init page and sequence magic number would be lost.  This
234          * means two log records instead of one :-(
235          */
236         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
237
238         START_CRIT_SECTION();
239
240         {
241                 /*
242                  * Note that the "tuple" structure is still just a local tuple
243                  * record created by heap_formtuple; its t_data pointer doesn't
244                  * point at the disk buffer.  To scribble on the disk buffer we
245                  * need to fetch the item pointer.      But do the same to the local
246                  * tuple, since that will be the source for the WAL log record,
247                  * below.
248                  */
249                 ItemId          itemId;
250                 Item            item;
251
252                 itemId = PageGetItemId((Page) page, FirstOffsetNumber);
253                 item = PageGetItem((Page) page, itemId);
254
255                 HeapTupleHeaderSetXmin((HeapTupleHeader) item, FrozenTransactionId);
256                 ((HeapTupleHeader) item)->t_infomask |= HEAP_XMIN_COMMITTED;
257
258                 HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);
259                 tuple->t_data->t_infomask |= HEAP_XMIN_COMMITTED;
260         }
261
262         /* XLOG stuff */
263         if (!rel->rd_istemp)
264         {
265                 xl_seq_rec      xlrec;
266                 XLogRecPtr      recptr;
267                 XLogRecData rdata[2];
268                 Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
269
270                 /* We do not log first nextval call, so "advance" sequence here */
271                 /* Note we are scribbling on local tuple, not the disk buffer */
272                 newseq->is_called = true;
273                 newseq->log_cnt = 0;
274
275                 xlrec.node = rel->rd_node;
276                 rdata[0].buffer = InvalidBuffer;
277                 rdata[0].data = (char *) &xlrec;
278                 rdata[0].len = sizeof(xl_seq_rec);
279                 rdata[0].next = &(rdata[1]);
280
281                 rdata[1].buffer = InvalidBuffer;
282                 rdata[1].data = (char *) tuple->t_data;
283                 rdata[1].len = tuple->t_len;
284                 rdata[1].next = NULL;
285
286                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
287
288                 PageSetLSN(page, recptr);
289                 PageSetSUI(page, ThisStartUpID);
290         }
291
292         END_CRIT_SECTION();
293
294         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
295         WriteBuffer(buf);
296         heap_close(rel, NoLock);
297 }
298
299 /*
300  * AlterSequence
301  *
302  * Modify the defition of a sequence relation
303  */
304 void
305 AlterSequence(AlterSeqStmt *stmt)
306 {
307         SeqTable        elm;
308         Relation        seqrel;
309         Buffer          buf;
310         Page            page;
311         Form_pg_sequence seq;
312         FormData_pg_sequence new;
313
314         /* open and AccessShareLock sequence */
315         init_sequence(stmt->sequence, &elm, &seqrel);
316
317         /* allow DROP to sequence owner only */
318         if (!pg_class_ownercheck(elm->relid, GetUserId()))
319                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
320                                            stmt->sequence->relname);
321
322         /* lock page' buffer and read tuple into new sequence structure */
323         seq = read_info(elm, seqrel, &buf);
324         page = BufferGetPage(buf);
325
326         new.increment_by = seq->increment_by;
327         new.max_value = seq->max_value;
328         new.min_value = seq->min_value;
329         new.cache_value = seq->cache_value;
330         new.is_cycled = seq->is_cycled;
331         new.last_value = seq->last_value;
332
333         /* Check and set values */
334         init_params(stmt->options, &new);
335
336         seq->increment_by = new.increment_by;
337         seq->max_value = new.max_value;
338         seq->min_value = new.min_value;
339         seq->cache_value = new.cache_value;
340         seq->is_cycled = new.is_cycled;
341         if (seq->last_value != new.last_value)
342         {
343                 seq->last_value = new.last_value;
344                 seq->is_called = false;
345                 seq->log_cnt = 1;
346         }
347
348         /* save info in local cache */
349         elm->last = new.last_value; /* last returned number */
350         elm->cached = new.last_value;           /* last cached number (forget
351                                                                                  * cached values) */
352
353         START_CRIT_SECTION();
354
355         /* XLOG stuff */
356         if (!seqrel->rd_istemp)
357         {
358                 xl_seq_rec      xlrec;
359                 XLogRecPtr      recptr;
360                 XLogRecData rdata[2];
361
362                 xlrec.node = seqrel->rd_node;
363                 rdata[0].buffer = InvalidBuffer;
364                 rdata[0].data = (char *) &xlrec;
365                 rdata[0].len = sizeof(xl_seq_rec);
366                 rdata[0].next = &(rdata[1]);
367
368                 rdata[1].buffer = InvalidBuffer;
369                 rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
370                 rdata[1].len = ((PageHeader) page)->pd_special -
371                         ((PageHeader) page)->pd_upper;
372                 rdata[1].next = NULL;
373
374                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
375
376                 PageSetLSN(page, recptr);
377                 PageSetSUI(page, ThisStartUpID);
378         }
379
380         END_CRIT_SECTION();
381
382         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
383
384         WriteBuffer(buf);
385
386         relation_close(seqrel, NoLock);
387 }
388
389
390 Datum
391 nextval(PG_FUNCTION_ARGS)
392 {
393         text       *seqin = PG_GETARG_TEXT_P(0);
394         RangeVar   *sequence;
395         SeqTable        elm;
396         Relation        seqrel;
397         Buffer          buf;
398         Page            page;
399         Form_pg_sequence seq;
400         int64           incby,
401                                 maxv,
402                                 minv,
403                                 cache,
404                                 log,
405                                 fetch,
406                                 last;
407         int64           result,
408                                 next,
409                                 rescnt = 0;
410         bool            logit = false;
411
412         sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
413                                                                                                                          "nextval"));
414
415         /* open and AccessShareLock sequence */
416         init_sequence(sequence, &elm, &seqrel);
417
418         if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
419                 ereport(ERROR,
420                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
421                                  errmsg("permission denied for sequence %s",
422                                                 sequence->relname)));
423
424         if (elm->last != elm->cached)           /* some numbers were cached */
425         {
426                 elm->last += elm->increment;
427                 relation_close(seqrel, NoLock);
428                 PG_RETURN_INT64(elm->last);
429         }
430
431         /* lock page' buffer and read tuple */
432         seq = read_info(elm, seqrel, &buf);
433         page = BufferGetPage(buf);
434
435         last = next = result = seq->last_value;
436         incby = seq->increment_by;
437         maxv = seq->max_value;
438         minv = seq->min_value;
439         fetch = cache = seq->cache_value;
440         log = seq->log_cnt;
441
442         if (!seq->is_called)
443         {
444                 rescnt++;                               /* last_value if not called */
445                 fetch--;
446                 log--;
447         }
448
449         /*
450          * Decide whether we should emit a WAL log record.      If so, force up
451          * the fetch count to grab SEQ_LOG_VALS more values than we actually
452          * need to cache.  (These will then be usable without logging.)
453          *
454          * If this is the first nextval after a checkpoint, we must force a new
455          * WAL record to be written anyway, else replay starting from the
456          * checkpoint would fail to advance the sequence past the logged
457          * values.      In this case we may as well fetch extra values.
458          */
459         if (log < fetch)
460         {
461                 /* forced log to satisfy local demand for values */
462                 fetch = log = fetch + SEQ_LOG_VALS;
463                 logit = true;
464         }
465         else
466         {
467                 XLogRecPtr      redoptr = GetRedoRecPtr();
468
469                 if (XLByteLE(PageGetLSN(page), redoptr))
470                 {
471                         /* last update of seq was before checkpoint */
472                         fetch = log = fetch + SEQ_LOG_VALS;
473                         logit = true;
474                 }
475         }
476
477         while (fetch)                           /* try to fetch cache [+ log ] numbers */
478         {
479                 /*
480                  * Check MAXVALUE for ascending sequences and MINVALUE for
481                  * descending sequences
482                  */
483                 if (incby > 0)
484                 {
485                         /* ascending sequence */
486                         if ((maxv >= 0 && next > maxv - incby) ||
487                                 (maxv < 0 && next + incby > maxv))
488                         {
489                                 if (rescnt > 0)
490                                         break;          /* stop fetching */
491                                 if (!seq->is_cycled)
492                                 {
493                                         char            buf[100];
494
495                                         snprintf(buf, sizeof(buf), INT64_FORMAT, maxv);
496                                         ereport(ERROR,
497                                           (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
498                                            errmsg("%s.nextval: reached MAXVALUE (%s)",
499                                                           sequence->relname, buf)));
500                                 }
501                                 next = minv;
502                         }
503                         else
504                                 next += incby;
505                 }
506                 else
507                 {
508                         /* descending sequence */
509                         if ((minv < 0 && next < minv - incby) ||
510                                 (minv >= 0 && next + incby < minv))
511                         {
512                                 if (rescnt > 0)
513                                         break;          /* stop fetching */
514                                 if (!seq->is_cycled)
515                                 {
516                                         char            buf[100];
517
518                                         snprintf(buf, sizeof(buf), INT64_FORMAT, minv);
519                                         ereport(ERROR,
520                                           (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
521                                            errmsg("%s.nextval: reached MINVALUE (%s)",
522                                                           sequence->relname, buf)));
523                                 }
524                                 next = maxv;
525                         }
526                         else
527                                 next += incby;
528                 }
529                 fetch--;
530                 if (rescnt < cache)
531                 {
532                         log--;
533                         rescnt++;
534                         last = next;
535                         if (rescnt == 1)        /* if it's first result - */
536                                 result = next;  /* it's what to return */
537                 }
538         }
539
540         log -= fetch;                           /* adjust for any unfetched numbers */
541         Assert(log >= 0);
542
543         /* save info in local cache */
544         elm->last = result;                     /* last returned number */
545         elm->cached = last;                     /* last fetched number */
546
547         START_CRIT_SECTION();
548
549         /* XLOG stuff */
550         if (logit && !seqrel->rd_istemp)
551         {
552                 xl_seq_rec      xlrec;
553                 XLogRecPtr      recptr;
554                 XLogRecData rdata[2];
555
556                 xlrec.node = seqrel->rd_node;
557                 rdata[0].buffer = InvalidBuffer;
558                 rdata[0].data = (char *) &xlrec;
559                 rdata[0].len = sizeof(xl_seq_rec);
560                 rdata[0].next = &(rdata[1]);
561
562                 /* set values that will be saved in xlog */
563                 seq->last_value = next;
564                 seq->is_called = true;
565                 seq->log_cnt = 0;
566
567                 rdata[1].buffer = InvalidBuffer;
568                 rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
569                 rdata[1].len = ((PageHeader) page)->pd_special -
570                         ((PageHeader) page)->pd_upper;
571                 rdata[1].next = NULL;
572
573                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
574
575                 PageSetLSN(page, recptr);
576                 PageSetSUI(page, ThisStartUpID);
577         }
578
579         /* update on-disk data */
580         seq->last_value = last;         /* last fetched number */
581         seq->is_called = true;
582         seq->log_cnt = log;                     /* how much is logged */
583
584         END_CRIT_SECTION();
585
586         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
587
588         WriteBuffer(buf);
589
590         relation_close(seqrel, NoLock);
591
592         PG_RETURN_INT64(result);
593 }
594
595 Datum
596 currval(PG_FUNCTION_ARGS)
597 {
598         text       *seqin = PG_GETARG_TEXT_P(0);
599         RangeVar   *sequence;
600         SeqTable        elm;
601         Relation        seqrel;
602         int64           result;
603
604         sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
605                                                                                                                          "currval"));
606
607         /* open and AccessShareLock sequence */
608         init_sequence(sequence, &elm, &seqrel);
609
610         if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
611                 ereport(ERROR,
612                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
613                                  errmsg("permission denied for sequence %s",
614                                                 sequence->relname)));
615
616         if (elm->increment == 0)        /* nextval/read_info were not called */
617                 ereport(ERROR,
618                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
619                                  errmsg("%s.currval is not yet defined in this session",
620                                                 sequence->relname)));
621
622         result = elm->last;
623
624         relation_close(seqrel, NoLock);
625
626         PG_RETURN_INT64(result);
627 }
628
629 /*
630  * Main internal procedure that handles 2 & 3 arg forms of SETVAL.
631  *
632  * Note that the 3 arg version (which sets the is_called flag) is
633  * only for use in pg_dump, and setting the is_called flag may not
634  * work if multiple users are attached to the database and referencing
635  * the sequence (unlikely if pg_dump is restoring it).
636  *
637  * It is necessary to have the 3 arg version so that pg_dump can
638  * restore the state of a sequence exactly during data-only restores -
639  * it is the only way to clear the is_called flag in an existing
640  * sequence.
641  */
642 static void
643 do_setval(RangeVar *sequence, int64 next, bool iscalled)
644 {
645         SeqTable        elm;
646         Relation        seqrel;
647         Buffer          buf;
648         Form_pg_sequence seq;
649
650         /* open and AccessShareLock sequence */
651         init_sequence(sequence, &elm, &seqrel);
652
653         if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
654                 ereport(ERROR,
655                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
656                                  errmsg("permission denied for sequence %s",
657                                                 sequence->relname)));
658
659         /* lock page' buffer and read tuple */
660         seq = read_info(elm, seqrel, &buf);
661
662         if ((next < seq->min_value) || (next > seq->max_value))
663         {
664                 char            bufv[100],
665                                         bufm[100],
666                                         bufx[100];
667
668                 snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
669                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, seq->min_value);
670                 snprintf(bufx, sizeof(bufx), INT64_FORMAT, seq->max_value);
671                 ereport(ERROR,
672                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
673                                  errmsg("%s.setval: value %s is out of bounds (%s..%s)",
674                                                 sequence->relname, bufv, bufm, bufx)));
675         }
676
677         /* save info in local cache */
678         elm->last = next;                       /* last returned number */
679         elm->cached = next;                     /* last cached number (forget cached
680                                                                  * values) */
681
682         START_CRIT_SECTION();
683
684         /* XLOG stuff */
685         if (!seqrel->rd_istemp)
686         {
687                 xl_seq_rec      xlrec;
688                 XLogRecPtr      recptr;
689                 XLogRecData rdata[2];
690                 Page            page = BufferGetPage(buf);
691
692                 xlrec.node = seqrel->rd_node;
693                 rdata[0].buffer = InvalidBuffer;
694                 rdata[0].data = (char *) &xlrec;
695                 rdata[0].len = sizeof(xl_seq_rec);
696                 rdata[0].next = &(rdata[1]);
697
698                 /* set values that will be saved in xlog */
699                 seq->last_value = next;
700                 seq->is_called = true;
701                 seq->log_cnt = 0;
702
703                 rdata[1].buffer = InvalidBuffer;
704                 rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
705                 rdata[1].len = ((PageHeader) page)->pd_special -
706                         ((PageHeader) page)->pd_upper;
707                 rdata[1].next = NULL;
708
709                 recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
710
711                 PageSetLSN(page, recptr);
712                 PageSetSUI(page, ThisStartUpID);
713         }
714
715         /* save info in sequence relation */
716         seq->last_value = next;         /* last fetched number */
717         seq->is_called = iscalled;
718         seq->log_cnt = (iscalled) ? 0 : 1;
719
720         END_CRIT_SECTION();
721
722         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
723
724         WriteBuffer(buf);
725
726         relation_close(seqrel, NoLock);
727 }
728
729 /*
730  * Implement the 2 arg setval procedure.
731  * See do_setval for discussion.
732  */
733 Datum
734 setval(PG_FUNCTION_ARGS)
735 {
736         text       *seqin = PG_GETARG_TEXT_P(0);
737         int64           next = PG_GETARG_INT64(1);
738         RangeVar   *sequence;
739
740         sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
741                                                                                                                           "setval"));
742
743         do_setval(sequence, next, true);
744
745         PG_RETURN_INT64(next);
746 }
747
748 /*
749  * Implement the 3 arg setval procedure.
750  * See do_setval for discussion.
751  */
752 Datum
753 setval_and_iscalled(PG_FUNCTION_ARGS)
754 {
755         text       *seqin = PG_GETARG_TEXT_P(0);
756         int64           next = PG_GETARG_INT64(1);
757         bool            iscalled = PG_GETARG_BOOL(2);
758         RangeVar   *sequence;
759
760         sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,
761                                                                                                                           "setval"));
762
763         do_setval(sequence, next, iscalled);
764
765         PG_RETURN_INT64(next);
766 }
767
768
769 /*
770  * Given a relation name, open and lock the sequence.  p_elm and p_rel are
771  * output parameters.
772  */
773 static void
774 init_sequence(RangeVar *relation, SeqTable *p_elm, Relation *p_rel)
775 {
776         Oid                     relid = RangeVarGetRelid(relation, false);
777         TransactionId thisxid = GetCurrentTransactionId();
778         SeqTable        elm;
779         Relation        seqrel;
780
781         /* Look to see if we already have a seqtable entry for relation */
782         for (elm = seqtab; elm != NULL; elm = elm->next)
783         {
784                 if (elm->relid == relid)
785                         break;
786         }
787
788         /*
789          * Open the sequence relation, acquiring AccessShareLock if we don't
790          * already have a lock in the current xact.
791          */
792         if (elm == NULL || elm->xid != thisxid)
793                 seqrel = relation_open(relid, AccessShareLock);
794         else
795                 seqrel = relation_open(relid, NoLock);
796
797         if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
798                 ereport(ERROR,
799                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
800                                  errmsg("\"%s\" is not a sequence",
801                                                 relation->relname)));
802
803         /*
804          * Allocate new seqtable entry if we didn't find one.
805          *
806          * NOTE: seqtable entries remain in the list for the life of a backend.
807          * If the sequence itself is deleted then the entry becomes wasted
808          * memory, but it's small enough that this should not matter.
809          */
810         if (elm == NULL)
811         {
812                 /*
813                  * Time to make a new seqtable entry.  These entries live as long
814                  * as the backend does, so we use plain malloc for them.
815                  */
816                 elm = (SeqTable) malloc(sizeof(SeqTableData));
817                 if (elm == NULL)
818                         ereport(ERROR,
819                                         (errcode(ERRCODE_OUT_OF_MEMORY),
820                                          errmsg("out of memory")));
821                 elm->relid = relid;
822                 /* increment is set to 0 until we do read_info (see currval) */
823                 elm->last = elm->cached = elm->increment = 0;
824                 elm->next = seqtab;
825                 seqtab = elm;
826         }
827
828         /* Flag that we have a lock in the current xact. */
829         elm->xid = thisxid;
830
831         *p_elm = elm;
832         *p_rel = seqrel;
833 }
834
835
836 /* Given an opened relation, lock the page buffer and find the tuple */
837 static Form_pg_sequence
838 read_info(SeqTable elm, Relation rel, Buffer *buf)
839 {
840         PageHeader      page;
841         ItemId          lp;
842         HeapTupleData tuple;
843         sequence_magic *sm;
844         Form_pg_sequence seq;
845
846         if (rel->rd_nblocks > 1)
847                 elog(ERROR, "invalid number of blocks in sequence \"%s\"",
848                          RelationGetRelationName(rel));
849
850         *buf = ReadBuffer(rel, 0);
851         if (!BufferIsValid(*buf))
852                 elog(ERROR, "ReadBuffer failed");
853
854         LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
855
856         page = (PageHeader) BufferGetPage(*buf);
857         sm = (sequence_magic *) PageGetSpecialPointer(page);
858
859         if (sm->magic != SEQ_MAGIC)
860                 elog(ERROR, "bad magic number in sequence \"%s\": %08X",
861                          RelationGetRelationName(rel), sm->magic);
862
863         lp = PageGetItemId(page, FirstOffsetNumber);
864         Assert(ItemIdIsUsed(lp));
865         tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
866
867         seq = (Form_pg_sequence) GETSTRUCT(&tuple);
868
869         elm->increment = seq->increment_by;
870
871         return seq;
872 }
873
874
875 static void
876 init_params(List *options, Form_pg_sequence new)
877 {
878         DefElem    *last_value = NULL;
879         DefElem    *increment_by = NULL;
880         DefElem    *max_value = NULL;
881         DefElem    *min_value = NULL;
882         DefElem    *cache_value = NULL;
883         bool            is_cycled_set = false;
884         List       *option;
885
886         foreach(option, options)
887         {
888                 DefElem    *defel = (DefElem *) lfirst(option);
889
890                 if (strcmp(defel->defname, "increment") == 0)
891                 {
892                         if (increment_by)
893                                 ereport(ERROR,
894                                                 (errcode(ERRCODE_SYNTAX_ERROR),
895                                                  errmsg("conflicting or redundant options")));
896                         increment_by = defel;
897                 }
898
899                 /*
900                  * start is for a new sequence restart is for alter
901                  */
902                 else if (strcmp(defel->defname, "start") == 0 ||
903                                  strcmp(defel->defname, "restart") == 0)
904                 {
905                         if (last_value)
906                                 ereport(ERROR,
907                                                 (errcode(ERRCODE_SYNTAX_ERROR),
908                                                  errmsg("conflicting or redundant options")));
909                         last_value = defel;
910                 }
911                 else if (strcmp(defel->defname, "maxvalue") == 0)
912                 {
913                         if (max_value)
914                                 ereport(ERROR,
915                                                 (errcode(ERRCODE_SYNTAX_ERROR),
916                                                  errmsg("conflicting or redundant options")));
917                         max_value = defel;
918                 }
919                 else if (strcmp(defel->defname, "minvalue") == 0)
920                 {
921                         if (min_value)
922                                 ereport(ERROR,
923                                                 (errcode(ERRCODE_SYNTAX_ERROR),
924                                                  errmsg("conflicting or redundant options")));
925                         min_value = defel;
926                 }
927                 else if (strcmp(defel->defname, "cache") == 0)
928                 {
929                         if (cache_value)
930                                 ereport(ERROR,
931                                                 (errcode(ERRCODE_SYNTAX_ERROR),
932                                                  errmsg("conflicting or redundant options")));
933                         cache_value = defel;
934                 }
935                 else if (strcmp(defel->defname, "cycle") == 0)
936                 {
937                         if (is_cycled_set)
938                                 ereport(ERROR,
939                                                 (errcode(ERRCODE_SYNTAX_ERROR),
940                                                  errmsg("conflicting or redundant options")));
941                         is_cycled_set = true;
942                         new->is_cycled = (defel->arg != NULL);
943                 }
944                 else
945                         elog(ERROR, "option \"%s\" not recognized",
946                                  defel->defname);
947         }
948
949         /* INCREMENT BY */
950         if (new->increment_by == 0 && increment_by == (DefElem *) NULL)
951                 new->increment_by = 1;
952         else if (increment_by != (DefElem *) NULL)
953         {
954                 new->increment_by = defGetInt64(increment_by);
955                 if (new->increment_by == 0)
956                         ereport(ERROR,
957                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958                                          errmsg("cannot increment by zero")));
959         }
960
961         /* MAXVALUE */
962         if ((new->max_value == 0 && max_value == (DefElem *) NULL)
963                 || (max_value != (DefElem *) NULL && !max_value->arg))
964         {
965                 if (new->increment_by > 0)
966                         new->max_value = SEQ_MAXVALUE;          /* ascending seq */
967                 else
968                         new->max_value = -1;    /* descending seq */
969         }
970         else if (max_value != (DefElem *) NULL)
971                 new->max_value = defGetInt64(max_value);
972
973         /* MINVALUE */
974         if ((new->min_value == 0 && min_value == (DefElem *) NULL)
975                 || (min_value != (DefElem *) NULL && !min_value->arg))
976         {
977                 if (new->increment_by > 0)
978                         new->min_value = 1; /* ascending seq */
979                 else
980                         new->min_value = SEQ_MINVALUE;          /* descending seq */
981         }
982         else if (min_value != (DefElem *) NULL)
983                 new->min_value = defGetInt64(min_value);
984
985         if (new->min_value >= new->max_value)
986         {
987                 char            bufm[100],
988                                         bufx[100];
989
990                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
991                 snprintf(bufx, sizeof(bufx), INT64_FORMAT, new->max_value);
992                 ereport(ERROR,
993                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
994                                  errmsg("MINVALUE (%s) must be less than MAXVALUE (%s)",
995                                                 bufm, bufx)));
996         }
997
998         /* START WITH */
999         if (new->last_value == 0 && last_value == (DefElem *) NULL)
1000         {
1001                 if (new->increment_by > 0)
1002                         new->last_value = new->min_value;       /* ascending seq */
1003                 else
1004                         new->last_value = new->max_value;       /* descending seq */
1005         }
1006         else if (last_value != (DefElem *) NULL)
1007                 new->last_value = defGetInt64(last_value);
1008
1009         if (new->last_value < new->min_value)
1010         {
1011                 char            bufs[100],
1012                                         bufm[100];
1013
1014                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
1015                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->min_value);
1016                 ereport(ERROR,
1017                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1018                           errmsg("START value (%s) can't be less than MINVALUE (%s)",
1019                                          bufs, bufm)));
1020         }
1021         if (new->last_value > new->max_value)
1022         {
1023                 char            bufs[100],
1024                                         bufm[100];
1025
1026                 snprintf(bufs, sizeof(bufs), INT64_FORMAT, new->last_value);
1027                 snprintf(bufm, sizeof(bufm), INT64_FORMAT, new->max_value);
1028                 ereport(ERROR,
1029                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1030                    errmsg("START value (%s) can't be greater than MAXVALUE (%s)",
1031                                   bufs, bufm)));
1032         }
1033
1034         /* CACHE */
1035         if (cache_value == (DefElem *) NULL)
1036                 new->cache_value = 1;
1037         else if ((new->cache_value = defGetInt64(cache_value)) <= 0)
1038         {
1039                 char            buf[100];
1040
1041                 snprintf(buf, sizeof(buf), INT64_FORMAT, new->cache_value);
1042                 ereport(ERROR,
1043                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1044                                  errmsg("CACHE (%s) must be greater than zero", buf)));
1045         }
1046 }
1047
1048
1049 void
1050 seq_redo(XLogRecPtr lsn, XLogRecord *record)
1051 {
1052         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1053         Relation        reln;
1054         Buffer          buffer;
1055         Page            page;
1056         char       *item;
1057         Size            itemsz;
1058         xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
1059         sequence_magic *sm;
1060
1061         if (info != XLOG_SEQ_LOG)
1062                 elog(PANIC, "seq_redo: unknown op code %u", info);
1063
1064         reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
1065         if (!RelationIsValid(reln))
1066                 return;
1067
1068         buffer = XLogReadBuffer(true, reln, 0);
1069         if (!BufferIsValid(buffer))
1070                 elog(PANIC, "seq_redo: can't read block of %u/%u",
1071                          xlrec->node.tblNode, xlrec->node.relNode);
1072
1073         page = (Page) BufferGetPage(buffer);
1074
1075         /* Always reinit the page and reinstall the magic number */
1076         /* See comments in DefineSequence */
1077         PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
1078         sm = (sequence_magic *) PageGetSpecialPointer(page);
1079         sm->magic = SEQ_MAGIC;
1080
1081         item = (char *) xlrec + sizeof(xl_seq_rec);
1082         itemsz = record->xl_len - sizeof(xl_seq_rec);
1083         itemsz = MAXALIGN(itemsz);
1084         if (PageAddItem(page, (Item) item, itemsz,
1085                                         FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
1086                 elog(PANIC, "seq_redo: failed to add item to page");
1087
1088         PageSetLSN(page, lsn);
1089         PageSetSUI(page, ThisStartUpID);
1090         UnlockAndWriteBuffer(buffer);
1091 }
1092
1093 void
1094 seq_undo(XLogRecPtr lsn, XLogRecord *record)
1095 {
1096 }
1097
1098 void
1099 seq_desc(char *buf, uint8 xl_info, char *rec)
1100 {
1101         uint8           info = xl_info & ~XLR_INFO_MASK;
1102         xl_seq_rec *xlrec = (xl_seq_rec *) rec;
1103
1104         if (info == XLOG_SEQ_LOG)
1105                 strcat(buf, "log: ");
1106         else
1107         {
1108                 strcat(buf, "UNKNOWN");
1109                 return;
1110         }
1111
1112         sprintf(buf + strlen(buf), "node %u/%u",
1113                         xlrec->node.tblNode, xlrec->node.relNode);
1114 }