]> granicus.if.org Git - postgresql/blob - src/backend/catalog/heap.c
Refine our definition of what constitutes a system relation.
[postgresql] / src / backend / catalog / heap.c
1 /*-------------------------------------------------------------------------
2  *
3  * heap.c
4  *        code to create and destroy POSTGRES heap relations
5  *
6  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/catalog/heap.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_create()                   - Create an uncataloged heap relation
16  *              heap_create_with_catalog() - Create a cataloged relation
17  *              heap_drop_with_catalog() - Removes named relation from catalogs
18  *
19  * NOTES
20  *        this code taken from access/heap/create.c, which contains
21  *        the old heap_create_with_catalog, amcreate, and amdestroy.
22  *        those routines will soon call these routines using the function
23  *        manager,
24  *        just like the poorly named "NewXXX" routines do.      The
25  *        "New" routines are all going to die soon, once and for all!
26  *              -cim 1/13/91
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31
32 #include "access/htup_details.h"
33 #include "access/multixact.h"
34 #include "access/sysattr.h"
35 #include "access/transam.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/dependency.h"
39 #include "catalog/heap.h"
40 #include "catalog/index.h"
41 #include "catalog/objectaccess.h"
42 #include "catalog/pg_attrdef.h"
43 #include "catalog/pg_collation.h"
44 #include "catalog/pg_constraint.h"
45 #include "catalog/pg_foreign_table.h"
46 #include "catalog/pg_inherits.h"
47 #include "catalog/pg_namespace.h"
48 #include "catalog/pg_statistic.h"
49 #include "catalog/pg_tablespace.h"
50 #include "catalog/pg_type.h"
51 #include "catalog/pg_type_fn.h"
52 #include "catalog/storage.h"
53 #include "catalog/storage_xlog.h"
54 #include "commands/tablecmds.h"
55 #include "commands/typecmds.h"
56 #include "miscadmin.h"
57 #include "nodes/nodeFuncs.h"
58 #include "optimizer/var.h"
59 #include "parser/parse_coerce.h"
60 #include "parser/parse_collate.h"
61 #include "parser/parse_expr.h"
62 #include "parser/parse_relation.h"
63 #include "storage/predicate.h"
64 #include "storage/smgr.h"
65 #include "utils/acl.h"
66 #include "utils/builtins.h"
67 #include "utils/fmgroids.h"
68 #include "utils/inval.h"
69 #include "utils/lsyscache.h"
70 #include "utils/rel.h"
71 #include "utils/snapmgr.h"
72 #include "utils/syscache.h"
73 #include "utils/tqual.h"
74
75
76 /* Potentially set by contrib/pg_upgrade_support functions */
77 Oid                     binary_upgrade_next_heap_pg_class_oid = InvalidOid;
78 Oid                     binary_upgrade_next_toast_pg_class_oid = InvalidOid;
79
80 static void AddNewRelationTuple(Relation pg_class_desc,
81                                         Relation new_rel_desc,
82                                         Oid new_rel_oid,
83                                         Oid new_type_oid,
84                                         Oid reloftype,
85                                         Oid relowner,
86                                         char relkind,
87                                         Datum relacl,
88                                         Datum reloptions);
89 static Oid AddNewRelationType(const char *typeName,
90                                    Oid typeNamespace,
91                                    Oid new_rel_oid,
92                                    char new_rel_kind,
93                                    Oid ownerid,
94                                    Oid new_row_type,
95                                    Oid new_array_type);
96 static void RelationRemoveInheritance(Oid relid);
97 static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
98                           bool is_validated, bool is_local, int inhcount,
99                           bool is_no_inherit, bool is_internal);
100 static void StoreConstraints(Relation rel, List *cooked_constraints,
101                                  bool is_internal);
102 static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
103                                                         bool allow_merge, bool is_local,
104                                                         bool is_no_inherit);
105 static void SetRelationNumChecks(Relation rel, int numchecks);
106 static Node *cookConstraint(ParseState *pstate,
107                            Node *raw_constraint,
108                            char *relname);
109 static List *insert_ordered_unique_oid(List *list, Oid datum);
110
111
112 /* ----------------------------------------------------------------
113  *                              XXX UGLY HARD CODED BADNESS FOLLOWS XXX
114  *
115  *              these should all be moved to someplace in the lib/catalog
116  *              module, if not obliterated first.
117  * ----------------------------------------------------------------
118  */
119
120
121 /*
122  * Note:
123  *              Should the system special case these attributes in the future?
124  *              Advantage:      consume much less space in the ATTRIBUTE relation.
125  *              Disadvantage:  special cases will be all over the place.
126  */
127
128 /*
129  * The initializers below do not include trailing variable length fields,
130  * but that's OK - we're never going to reference anything beyond the
131  * fixed-size portion of the structure anyway.
132  */
133
134 static FormData_pg_attribute a1 = {
135         0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
136         SelfItemPointerAttributeNumber, 0, -1, -1,
137         false, 'p', 's', true, false, false, true, 0
138 };
139
140 static FormData_pg_attribute a2 = {
141         0, {"oid"}, OIDOID, 0, sizeof(Oid),
142         ObjectIdAttributeNumber, 0, -1, -1,
143         true, 'p', 'i', true, false, false, true, 0
144 };
145
146 static FormData_pg_attribute a3 = {
147         0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
148         MinTransactionIdAttributeNumber, 0, -1, -1,
149         true, 'p', 'i', true, false, false, true, 0
150 };
151
152 static FormData_pg_attribute a4 = {
153         0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
154         MinCommandIdAttributeNumber, 0, -1, -1,
155         true, 'p', 'i', true, false, false, true, 0
156 };
157
158 static FormData_pg_attribute a5 = {
159         0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
160         MaxTransactionIdAttributeNumber, 0, -1, -1,
161         true, 'p', 'i', true, false, false, true, 0
162 };
163
164 static FormData_pg_attribute a6 = {
165         0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
166         MaxCommandIdAttributeNumber, 0, -1, -1,
167         true, 'p', 'i', true, false, false, true, 0
168 };
169
170 /*
171  * We decided to call this attribute "tableoid" rather than say
172  * "classoid" on the basis that in the future there may be more than one
173  * table of a particular class/type. In any case table is still the word
174  * used in SQL.
175  */
176 static FormData_pg_attribute a7 = {
177         0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
178         TableOidAttributeNumber, 0, -1, -1,
179         true, 'p', 'i', true, false, false, true, 0
180 };
181
182 static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
183
184 /*
185  * This function returns a Form_pg_attribute pointer for a system attribute.
186  * Note that we elog if the presented attno is invalid, which would only
187  * happen if there's a problem upstream.
188  */
189 Form_pg_attribute
190 SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
191 {
192         if (attno >= 0 || attno < -(int) lengthof(SysAtt))
193                 elog(ERROR, "invalid system attribute number %d", attno);
194         if (attno == ObjectIdAttributeNumber && !relhasoids)
195                 elog(ERROR, "invalid system attribute number %d", attno);
196         return SysAtt[-attno - 1];
197 }
198
199 /*
200  * If the given name is a system attribute name, return a Form_pg_attribute
201  * pointer for a prototype definition.  If not, return NULL.
202  */
203 Form_pg_attribute
204 SystemAttributeByName(const char *attname, bool relhasoids)
205 {
206         int                     j;
207
208         for (j = 0; j < (int) lengthof(SysAtt); j++)
209         {
210                 Form_pg_attribute att = SysAtt[j];
211
212                 if (relhasoids || att->attnum != ObjectIdAttributeNumber)
213                 {
214                         if (strcmp(NameStr(att->attname), attname) == 0)
215                                 return att;
216                 }
217         }
218
219         return NULL;
220 }
221
222
223 /* ----------------------------------------------------------------
224  *                              XXX END OF UGLY HARD CODED BADNESS XXX
225  * ---------------------------------------------------------------- */
226
227
228 /* ----------------------------------------------------------------
229  *              heap_create             - Create an uncataloged heap relation
230  *
231  *              Note API change: the caller must now always provide the OID
232  *              to use for the relation.  The relfilenode may (and, normally,
233  *              should) be left unspecified.
234  *
235  *              rel->rd_rel is initialized by RelationBuildLocalRelation,
236  *              and is mostly zeroes at return.
237  * ----------------------------------------------------------------
238  */
239 Relation
240 heap_create(const char *relname,
241                         Oid relnamespace,
242                         Oid reltablespace,
243                         Oid relid,
244                         Oid relfilenode,
245                         TupleDesc tupDesc,
246                         char relkind,
247                         char relpersistence,
248                         bool shared_relation,
249                         bool mapped_relation,
250                         bool allow_system_table_mods)
251 {
252         bool            create_storage;
253         Relation        rel;
254
255         /* The caller must have provided an OID for the relation. */
256         Assert(OidIsValid(relid));
257
258         /*
259          * Don't allow creating relations in pg_catalog directly, even though it
260          * is allowed to move user defined relations there. Semantics with search
261          * paths including pg_catalog are too confusing for now.
262          *
263          * But allow creating indexes on relations in pg_catalog even if
264          * allow_system_table_mods = off, upper layers already guarantee it's on a
265          * user defined relation, not a system one.
266          */
267         if (!allow_system_table_mods &&
268                 ((IsSystemNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
269                  IsToastNamespace(relnamespace)) &&
270                 IsNormalProcessingMode())
271                 ereport(ERROR,
272                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
273                                  errmsg("permission denied to create \"%s.%s\"",
274                                                 get_namespace_name(relnamespace), relname),
275                 errdetail("System catalog modifications are currently disallowed.")));
276
277         /*
278          * Decide if we need storage or not, and handle a couple other special
279          * cases for particular relkinds.
280          */
281         switch (relkind)
282         {
283                 case RELKIND_VIEW:
284                 case RELKIND_COMPOSITE_TYPE:
285                 case RELKIND_FOREIGN_TABLE:
286                         create_storage = false;
287
288                         /*
289                          * Force reltablespace to zero if the relation has no physical
290                          * storage.  This is mainly just for cleanliness' sake.
291                          */
292                         reltablespace = InvalidOid;
293                         break;
294                 case RELKIND_SEQUENCE:
295                         create_storage = true;
296
297                         /*
298                          * Force reltablespace to zero for sequences, since we don't
299                          * support moving them around into different tablespaces.
300                          */
301                         reltablespace = InvalidOid;
302                         break;
303                 default:
304                         create_storage = true;
305                         break;
306         }
307
308         /*
309          * Unless otherwise requested, the physical ID (relfilenode) is initially
310          * the same as the logical ID (OID).  When the caller did specify a
311          * relfilenode, it already exists; do not attempt to create it.
312          */
313         if (OidIsValid(relfilenode))
314                 create_storage = false;
315         else
316                 relfilenode = relid;
317
318         /*
319          * Never allow a pg_class entry to explicitly specify the database's
320          * default tablespace in reltablespace; force it to zero instead. This
321          * ensures that if the database is cloned with a different default
322          * tablespace, the pg_class entry will still match where CREATE DATABASE
323          * will put the physically copied relation.
324          *
325          * Yes, this is a bit of a hack.
326          */
327         if (reltablespace == MyDatabaseTableSpace)
328                 reltablespace = InvalidOid;
329
330         /*
331          * build the relcache entry.
332          */
333         rel = RelationBuildLocalRelation(relname,
334                                                                          relnamespace,
335                                                                          tupDesc,
336                                                                          relid,
337                                                                          relfilenode,
338                                                                          reltablespace,
339                                                                          shared_relation,
340                                                                          mapped_relation,
341                                                                          relpersistence,
342                                                                          relkind);
343
344         /*
345          * Have the storage manager create the relation's disk file, if needed.
346          *
347          * We only create the main fork here, other forks will be created on
348          * demand.
349          */
350         if (create_storage)
351         {
352                 RelationOpenSmgr(rel);
353                 RelationCreateStorage(rel->rd_node, relpersistence);
354         }
355
356         return rel;
357 }
358
359 /* ----------------------------------------------------------------
360  *              heap_create_with_catalog                - Create a cataloged relation
361  *
362  *              this is done in multiple steps:
363  *
364  *              1) CheckAttributeNamesTypes() is used to make certain the tuple
365  *                 descriptor contains a valid set of attribute names and types
366  *
367  *              2) pg_class is opened and get_relname_relid()
368  *                 performs a scan to ensure that no relation with the
369  *                 same name already exists.
370  *
371  *              3) heap_create() is called to create the new relation on disk.
372  *
373  *              4) TypeCreate() is called to define a new type corresponding
374  *                 to the new relation.
375  *
376  *              5) AddNewRelationTuple() is called to register the
377  *                 relation in pg_class.
378  *
379  *              6) AddNewAttributeTuples() is called to register the
380  *                 new relation's schema in pg_attribute.
381  *
382  *              7) StoreConstraints is called ()                - vadim 08/22/97
383  *
384  *              8) the relations are closed and the new relation's oid
385  *                 is returned.
386  *
387  * ----------------------------------------------------------------
388  */
389
390 /* --------------------------------
391  *              CheckAttributeNamesTypes
392  *
393  *              this is used to make certain the tuple descriptor contains a
394  *              valid set of attribute names and datatypes.  a problem simply
395  *              generates ereport(ERROR) which aborts the current transaction.
396  * --------------------------------
397  */
398 void
399 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
400                                                  bool allow_system_table_mods)
401 {
402         int                     i;
403         int                     j;
404         int                     natts = tupdesc->natts;
405
406         /* Sanity check on column count */
407         if (natts < 0 || natts > MaxHeapAttributeNumber)
408                 ereport(ERROR,
409                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
410                                  errmsg("tables can have at most %d columns",
411                                                 MaxHeapAttributeNumber)));
412
413         /*
414          * first check for collision with system attribute names
415          *
416          * Skip this for a view or type relation, since those don't have system
417          * attributes.
418          */
419         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
420         {
421                 for (i = 0; i < natts; i++)
422                 {
423                         if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
424                                                                           tupdesc->tdhasoid) != NULL)
425                                 ereport(ERROR,
426                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
427                                                  errmsg("column name \"%s\" conflicts with a system column name",
428                                                                 NameStr(tupdesc->attrs[i]->attname))));
429                 }
430         }
431
432         /*
433          * next check for repeated attribute names
434          */
435         for (i = 1; i < natts; i++)
436         {
437                 for (j = 0; j < i; j++)
438                 {
439                         if (strcmp(NameStr(tupdesc->attrs[j]->attname),
440                                            NameStr(tupdesc->attrs[i]->attname)) == 0)
441                                 ereport(ERROR,
442                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
443                                                  errmsg("column name \"%s\" specified more than once",
444                                                                 NameStr(tupdesc->attrs[j]->attname))));
445                 }
446         }
447
448         /*
449          * next check the attribute types
450          */
451         for (i = 0; i < natts; i++)
452         {
453                 CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
454                                                    tupdesc->attrs[i]->atttypid,
455                                                    tupdesc->attrs[i]->attcollation,
456                                                    NIL, /* assume we're creating a new rowtype */
457                                                    allow_system_table_mods);
458         }
459 }
460
461 /* --------------------------------
462  *              CheckAttributeType
463  *
464  *              Verify that the proposed datatype of an attribute is legal.
465  *              This is needed mainly because there are types (and pseudo-types)
466  *              in the catalogs that we do not support as elements of real tuples.
467  *              We also check some other properties required of a table column.
468  *
469  * If the attribute is being proposed for addition to an existing table or
470  * composite type, pass a one-element list of the rowtype OID as
471  * containing_rowtypes.  When checking a to-be-created rowtype, it's
472  * sufficient to pass NIL, because there could not be any recursive reference
473  * to a not-yet-existing rowtype.
474  * --------------------------------
475  */
476 void
477 CheckAttributeType(const char *attname,
478                                    Oid atttypid, Oid attcollation,
479                                    List *containing_rowtypes,
480                                    bool allow_system_table_mods)
481 {
482         char            att_typtype = get_typtype(atttypid);
483         Oid                     att_typelem;
484
485         if (atttypid == UNKNOWNOID)
486         {
487                 /*
488                  * Warn user, but don't fail, if column to be created has UNKNOWN type
489                  * (usually as a result of a 'retrieve into' - jolly)
490                  */
491                 ereport(WARNING,
492                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
493                                  errmsg("column \"%s\" has type \"unknown\"", attname),
494                                  errdetail("Proceeding with relation creation anyway.")));
495         }
496         else if (att_typtype == TYPTYPE_PSEUDO)
497         {
498                 /*
499                  * Refuse any attempt to create a pseudo-type column, except for a
500                  * special hack for pg_statistic: allow ANYARRAY when modifying system
501                  * catalogs (this allows creating pg_statistic and cloning it during
502                  * VACUUM FULL)
503                  */
504                 if (atttypid != ANYARRAYOID || !allow_system_table_mods)
505                         ereport(ERROR,
506                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
507                                          errmsg("column \"%s\" has pseudo-type %s",
508                                                         attname, format_type_be(atttypid))));
509         }
510         else if (att_typtype == TYPTYPE_DOMAIN)
511         {
512                 /*
513                  * If it's a domain, recurse to check its base type.
514                  */
515                 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
516                                                    containing_rowtypes,
517                                                    allow_system_table_mods);
518         }
519         else if (att_typtype == TYPTYPE_COMPOSITE)
520         {
521                 /*
522                  * For a composite type, recurse into its attributes.
523                  */
524                 Relation        relation;
525                 TupleDesc       tupdesc;
526                 int                     i;
527
528                 /*
529                  * Check for self-containment.  Eventually we might be able to allow
530                  * this (just return without complaint, if so) but it's not clear how
531                  * many other places would require anti-recursion defenses before it
532                  * would be safe to allow tables to contain their own rowtype.
533                  */
534                 if (list_member_oid(containing_rowtypes, atttypid))
535                         ereport(ERROR,
536                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
537                                 errmsg("composite type %s cannot be made a member of itself",
538                                            format_type_be(atttypid))));
539
540                 containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
541
542                 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
543
544                 tupdesc = RelationGetDescr(relation);
545
546                 for (i = 0; i < tupdesc->natts; i++)
547                 {
548                         Form_pg_attribute attr = tupdesc->attrs[i];
549
550                         if (attr->attisdropped)
551                                 continue;
552                         CheckAttributeType(NameStr(attr->attname),
553                                                            attr->atttypid, attr->attcollation,
554                                                            containing_rowtypes,
555                                                            allow_system_table_mods);
556                 }
557
558                 relation_close(relation, AccessShareLock);
559
560                 containing_rowtypes = list_delete_first(containing_rowtypes);
561         }
562         else if (OidIsValid((att_typelem = get_element_type(atttypid))))
563         {
564                 /*
565                  * Must recurse into array types, too, in case they are composite.
566                  */
567                 CheckAttributeType(attname, att_typelem, attcollation,
568                                                    containing_rowtypes,
569                                                    allow_system_table_mods);
570         }
571
572         /*
573          * This might not be strictly invalid per SQL standard, but it is pretty
574          * useless, and it cannot be dumped, so we must disallow it.
575          */
576         if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
577                 ereport(ERROR,
578                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
579                                  errmsg("no collation was derived for column \"%s\" with collatable type %s",
580                                                 attname, format_type_be(atttypid)),
581                 errhint("Use the COLLATE clause to set the collation explicitly.")));
582 }
583
584 /*
585  * InsertPgAttributeTuple
586  *              Construct and insert a new tuple in pg_attribute.
587  *
588  * Caller has already opened and locked pg_attribute.  new_attribute is the
589  * attribute to insert (but we ignore attacl and attoptions, which are always
590  * initialized to NULL).
591  *
592  * indstate is the index state for CatalogIndexInsert.  It can be passed as
593  * NULL, in which case we'll fetch the necessary info.  (Don't do this when
594  * inserting multiple attributes, because it's a tad more expensive.)
595  */
596 void
597 InsertPgAttributeTuple(Relation pg_attribute_rel,
598                                            Form_pg_attribute new_attribute,
599                                            CatalogIndexState indstate)
600 {
601         Datum           values[Natts_pg_attribute];
602         bool            nulls[Natts_pg_attribute];
603         HeapTuple       tup;
604
605         /* This is a tad tedious, but way cleaner than what we used to do... */
606         memset(values, 0, sizeof(values));
607         memset(nulls, false, sizeof(nulls));
608
609         values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
610         values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
611         values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
612         values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
613         values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
614         values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
615         values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
616         values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
617         values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
618         values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
619         values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
620         values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
621         values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
622         values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
623         values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
624         values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
625         values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
626         values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
627
628         /* start out with empty permissions and empty options */
629         nulls[Anum_pg_attribute_attacl - 1] = true;
630         nulls[Anum_pg_attribute_attoptions - 1] = true;
631         nulls[Anum_pg_attribute_attfdwoptions - 1] = true;
632
633         tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
634
635         /* finally insert the new tuple, update the indexes, and clean up */
636         simple_heap_insert(pg_attribute_rel, tup);
637
638         if (indstate != NULL)
639                 CatalogIndexInsert(indstate, tup);
640         else
641                 CatalogUpdateIndexes(pg_attribute_rel, tup);
642
643         heap_freetuple(tup);
644 }
645
646 /* --------------------------------
647  *              AddNewAttributeTuples
648  *
649  *              this registers the new relation's schema by adding
650  *              tuples to pg_attribute.
651  * --------------------------------
652  */
653 static void
654 AddNewAttributeTuples(Oid new_rel_oid,
655                                           TupleDesc tupdesc,
656                                           char relkind,
657                                           bool oidislocal,
658                                           int oidinhcount)
659 {
660         Form_pg_attribute attr;
661         int                     i;
662         Relation        rel;
663         CatalogIndexState indstate;
664         int                     natts = tupdesc->natts;
665         ObjectAddress myself,
666                                 referenced;
667
668         /*
669          * open pg_attribute and its indexes.
670          */
671         rel = heap_open(AttributeRelationId, RowExclusiveLock);
672
673         indstate = CatalogOpenIndexes(rel);
674
675         /*
676          * First we add the user attributes.  This is also a convenient place to
677          * add dependencies on their datatypes and collations.
678          */
679         for (i = 0; i < natts; i++)
680         {
681                 attr = tupdesc->attrs[i];
682                 /* Fill in the correct relation OID */
683                 attr->attrelid = new_rel_oid;
684                 /* Make sure these are OK, too */
685                 attr->attstattarget = -1;
686                 attr->attcacheoff = -1;
687
688                 InsertPgAttributeTuple(rel, attr, indstate);
689
690                 /* Add dependency info */
691                 myself.classId = RelationRelationId;
692                 myself.objectId = new_rel_oid;
693                 myself.objectSubId = i + 1;
694                 referenced.classId = TypeRelationId;
695                 referenced.objectId = attr->atttypid;
696                 referenced.objectSubId = 0;
697                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
698
699                 /* The default collation is pinned, so don't bother recording it */
700                 if (OidIsValid(attr->attcollation) &&
701                         attr->attcollation != DEFAULT_COLLATION_OID)
702                 {
703                         referenced.classId = CollationRelationId;
704                         referenced.objectId = attr->attcollation;
705                         referenced.objectSubId = 0;
706                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
707                 }
708         }
709
710         /*
711          * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
712          * all for a view or type relation.  We don't bother with making datatype
713          * dependencies here, since presumably all these types are pinned.
714          */
715         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
716         {
717                 for (i = 0; i < (int) lengthof(SysAtt); i++)
718                 {
719                         FormData_pg_attribute attStruct;
720
721                         /* skip OID where appropriate */
722                         if (!tupdesc->tdhasoid &&
723                                 SysAtt[i]->attnum == ObjectIdAttributeNumber)
724                                 continue;
725
726                         memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
727
728                         /* Fill in the correct relation OID in the copied tuple */
729                         attStruct.attrelid = new_rel_oid;
730
731                         /* Fill in correct inheritance info for the OID column */
732                         if (attStruct.attnum == ObjectIdAttributeNumber)
733                         {
734                                 attStruct.attislocal = oidislocal;
735                                 attStruct.attinhcount = oidinhcount;
736                         }
737
738                         InsertPgAttributeTuple(rel, &attStruct, indstate);
739                 }
740         }
741
742         /*
743          * clean up
744          */
745         CatalogCloseIndexes(indstate);
746
747         heap_close(rel, RowExclusiveLock);
748 }
749
750 /* --------------------------------
751  *              InsertPgClassTuple
752  *
753  *              Construct and insert a new tuple in pg_class.
754  *
755  * Caller has already opened and locked pg_class.
756  * Tuple data is taken from new_rel_desc->rd_rel, except for the
757  * variable-width fields which are not present in a cached reldesc.
758  * relacl and reloptions are passed in Datum form (to avoid having
759  * to reference the data types in heap.h).      Pass (Datum) 0 to set them
760  * to NULL.
761  * --------------------------------
762  */
763 void
764 InsertPgClassTuple(Relation pg_class_desc,
765                                    Relation new_rel_desc,
766                                    Oid new_rel_oid,
767                                    Datum relacl,
768                                    Datum reloptions)
769 {
770         Form_pg_class rd_rel = new_rel_desc->rd_rel;
771         Datum           values[Natts_pg_class];
772         bool            nulls[Natts_pg_class];
773         HeapTuple       tup;
774
775         /* This is a tad tedious, but way cleaner than what we used to do... */
776         memset(values, 0, sizeof(values));
777         memset(nulls, false, sizeof(nulls));
778
779         values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
780         values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
781         values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
782         values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
783         values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
784         values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
785         values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
786         values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
787         values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
788         values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
789         values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
790         values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
791         values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
792         values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
793         values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
794         values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
795         values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
796         values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
797         values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
798         values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
799         values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
800         values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
801         values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
802         values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
803         values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
804         values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
805         values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
806         if (relacl != (Datum) 0)
807                 values[Anum_pg_class_relacl - 1] = relacl;
808         else
809                 nulls[Anum_pg_class_relacl - 1] = true;
810         if (reloptions != (Datum) 0)
811                 values[Anum_pg_class_reloptions - 1] = reloptions;
812         else
813                 nulls[Anum_pg_class_reloptions - 1] = true;
814
815         tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
816
817         /*
818          * The new tuple must have the oid already chosen for the rel.  Sure would
819          * be embarrassing to do this sort of thing in polite company.
820          */
821         HeapTupleSetOid(tup, new_rel_oid);
822
823         /* finally insert the new tuple, update the indexes, and clean up */
824         simple_heap_insert(pg_class_desc, tup);
825
826         CatalogUpdateIndexes(pg_class_desc, tup);
827
828         heap_freetuple(tup);
829 }
830
831 /* --------------------------------
832  *              AddNewRelationTuple
833  *
834  *              this registers the new relation in the catalogs by
835  *              adding a tuple to pg_class.
836  * --------------------------------
837  */
838 static void
839 AddNewRelationTuple(Relation pg_class_desc,
840                                         Relation new_rel_desc,
841                                         Oid new_rel_oid,
842                                         Oid new_type_oid,
843                                         Oid reloftype,
844                                         Oid relowner,
845                                         char relkind,
846                                         Datum relacl,
847                                         Datum reloptions)
848 {
849         Form_pg_class new_rel_reltup;
850
851         /*
852          * first we update some of the information in our uncataloged relation's
853          * relation descriptor.
854          */
855         new_rel_reltup = new_rel_desc->rd_rel;
856
857         switch (relkind)
858         {
859                 case RELKIND_RELATION:
860                 case RELKIND_MATVIEW:
861                 case RELKIND_INDEX:
862                 case RELKIND_TOASTVALUE:
863                         /* The relation is real, but as yet empty */
864                         new_rel_reltup->relpages = 0;
865                         new_rel_reltup->reltuples = 0;
866                         new_rel_reltup->relallvisible = 0;
867                         break;
868                 case RELKIND_SEQUENCE:
869                         /* Sequences always have a known size */
870                         new_rel_reltup->relpages = 1;
871                         new_rel_reltup->reltuples = 1;
872                         new_rel_reltup->relallvisible = 0;
873                         break;
874                 default:
875                         /* Views, etc, have no disk storage */
876                         new_rel_reltup->relpages = 0;
877                         new_rel_reltup->reltuples = 0;
878                         new_rel_reltup->relallvisible = 0;
879                         break;
880         }
881
882         /* Initialize relfrozenxid and relminmxid */
883         if (relkind == RELKIND_RELATION ||
884                 relkind == RELKIND_MATVIEW ||
885                 relkind == RELKIND_TOASTVALUE)
886         {
887                 /*
888                  * Initialize to the minimum XID that could put tuples in the table.
889                  * We know that no xacts older than RecentXmin are still running, so
890                  * that will do.
891                  */
892                 new_rel_reltup->relfrozenxid = RecentXmin;
893
894                 /*
895                  * Similarly, initialize the minimum Multixact to the first value that
896                  * could possibly be stored in tuples in the table.  Running
897                  * transactions could reuse values from their local cache, so we are
898                  * careful to consider all currently running multis.
899                  *
900                  * XXX this could be refined further, but is it worth the hassle?
901                  */
902                 new_rel_reltup->relminmxid = GetOldestMultiXactId();
903         }
904         else
905         {
906                 /*
907                  * Other relation types will not contain XIDs, so set relfrozenxid to
908                  * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
909                  * we force its xmin to be FrozenTransactionId always; see
910                  * commands/sequence.c.)
911                  */
912                 new_rel_reltup->relfrozenxid = InvalidTransactionId;
913                 new_rel_reltup->relfrozenxid = InvalidMultiXactId;
914         }
915
916         new_rel_reltup->relowner = relowner;
917         new_rel_reltup->reltype = new_type_oid;
918         new_rel_reltup->reloftype = reloftype;
919
920         new_rel_desc->rd_att->tdtypeid = new_type_oid;
921
922         /* Now build and insert the tuple */
923         InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
924                                            relacl, reloptions);
925 }
926
927
928 /* --------------------------------
929  *              AddNewRelationType -
930  *
931  *              define a composite type corresponding to the new relation
932  * --------------------------------
933  */
934 static Oid
935 AddNewRelationType(const char *typeName,
936                                    Oid typeNamespace,
937                                    Oid new_rel_oid,
938                                    char new_rel_kind,
939                                    Oid ownerid,
940                                    Oid new_row_type,
941                                    Oid new_array_type)
942 {
943         return
944                 TypeCreate(new_row_type,        /* optional predetermined OID */
945                                    typeName,    /* type name */
946                                    typeNamespace,               /* type namespace */
947                                    new_rel_oid, /* relation oid */
948                                    new_rel_kind,        /* relation kind */
949                                    ownerid,             /* owner's ID */
950                                    -1,                  /* internal size (varlena) */
951                                    TYPTYPE_COMPOSITE,   /* type-type (composite) */
952                                    TYPCATEGORY_COMPOSITE,               /* type-category (ditto) */
953                                    false,               /* composite types are never preferred */
954                                    DEFAULT_TYPDELIM,    /* default array delimiter */
955                                    F_RECORD_IN, /* input procedure */
956                                    F_RECORD_OUT,        /* output procedure */
957                                    F_RECORD_RECV,               /* receive procedure */
958                                    F_RECORD_SEND,               /* send procedure */
959                                    InvalidOid,  /* typmodin procedure - none */
960                                    InvalidOid,  /* typmodout procedure - none */
961                                    InvalidOid,  /* analyze procedure - default */
962                                    InvalidOid,  /* array element type - irrelevant */
963                                    false,               /* this is not an array type */
964                                    new_array_type,              /* array type if any */
965                                    InvalidOid,  /* domain base type - irrelevant */
966                                    NULL,                /* default value - none */
967                                    NULL,                /* default binary representation */
968                                    false,               /* passed by reference */
969                                    'd',                 /* alignment - must be the largest! */
970                                    'x',                 /* fully TOASTable */
971                                    -1,                  /* typmod */
972                                    0,                   /* array dimensions for typBaseType */
973                                    false,               /* Type NOT NULL */
974                                    InvalidOid); /* rowtypes never have a collation */
975 }
976
977 /* --------------------------------
978  *              heap_create_with_catalog
979  *
980  *              creates a new cataloged relation.  see comments above.
981  *
982  * Arguments:
983  *      relname: name to give to new rel
984  *      relnamespace: OID of namespace it goes in
985  *      reltablespace: OID of tablespace it goes in
986  *      relid: OID to assign to new rel, or InvalidOid to select a new OID
987  *      reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
988  *      reloftypeid: if a typed table, OID of underlying type; else InvalidOid
989  *      ownerid: OID of new rel's owner
990  *      tupdesc: tuple descriptor (source of column definitions)
991  *      cooked_constraints: list of precooked check constraints and defaults
992  *      relkind: relkind for new rel
993  *      relpersistence: rel's persistence status (permanent, temp, or unlogged)
994  *      shared_relation: TRUE if it's to be a shared relation
995  *      mapped_relation: TRUE if the relation will use the relfilenode map
996  *      oidislocal: TRUE if oid column (if any) should be marked attislocal
997  *      oidinhcount: attinhcount to assign to oid column (if any)
998  *      oncommit: ON COMMIT marking (only relevant if it's a temp table)
999  *      reloptions: reloptions in Datum form, or (Datum) 0 if none
1000  *      use_user_acl: TRUE if should look for user-defined default permissions;
1001  *              if FALSE, relacl is always set NULL
1002  *      allow_system_table_mods: TRUE to allow creation in system namespaces
1003  *
1004  * Returns the OID of the new relation
1005  * --------------------------------
1006  */
1007 Oid
1008 heap_create_with_catalog(const char *relname,
1009                                                  Oid relnamespace,
1010                                                  Oid reltablespace,
1011                                                  Oid relid,
1012                                                  Oid reltypeid,
1013                                                  Oid reloftypeid,
1014                                                  Oid ownerid,
1015                                                  TupleDesc tupdesc,
1016                                                  List *cooked_constraints,
1017                                                  char relkind,
1018                                                  char relpersistence,
1019                                                  bool shared_relation,
1020                                                  bool mapped_relation,
1021                                                  bool oidislocal,
1022                                                  int oidinhcount,
1023                                                  OnCommitAction oncommit,
1024                                                  Datum reloptions,
1025                                                  bool use_user_acl,
1026                                                  bool allow_system_table_mods,
1027                                                  bool is_internal)
1028 {
1029         Relation        pg_class_desc;
1030         Relation        new_rel_desc;
1031         Acl                *relacl;
1032         Oid                     existing_relid;
1033         Oid                     old_type_oid;
1034         Oid                     new_type_oid;
1035         Oid                     new_array_oid = InvalidOid;
1036
1037         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1038
1039         /*
1040          * sanity checks
1041          */
1042         Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1043
1044         CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
1045
1046         /*
1047          * This would fail later on anyway, if the relation already exists.  But
1048          * by catching it here we can emit a nicer error message.
1049          */
1050         existing_relid = get_relname_relid(relname, relnamespace);
1051         if (existing_relid != InvalidOid)
1052                 ereport(ERROR,
1053                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1054                                  errmsg("relation \"%s\" already exists", relname)));
1055
1056         /*
1057          * Since we are going to create a rowtype as well, also check for
1058          * collision with an existing type name.  If there is one and it's an
1059          * autogenerated array, we can rename it out of the way; otherwise we can
1060          * at least give a good error message.
1061          */
1062         old_type_oid = GetSysCacheOid2(TYPENAMENSP,
1063                                                                    CStringGetDatum(relname),
1064                                                                    ObjectIdGetDatum(relnamespace));
1065         if (OidIsValid(old_type_oid))
1066         {
1067                 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1068                         ereport(ERROR,
1069                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
1070                                          errmsg("type \"%s\" already exists", relname),
1071                            errhint("A relation has an associated type of the same name, "
1072                                            "so you must use a name that doesn't conflict "
1073                                            "with any existing type.")));
1074         }
1075
1076         /*
1077          * Shared relations must be in pg_global (last-ditch check)
1078          */
1079         if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1080                 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1081
1082         /*
1083          * Allocate an OID for the relation, unless we were told what to use.
1084          *
1085          * The OID will be the relfilenode as well, so make sure it doesn't
1086          * collide with either pg_class OIDs or existing physical files.
1087          */
1088         if (!OidIsValid(relid))
1089         {
1090                 /*
1091                  * Use binary-upgrade override for pg_class.oid/relfilenode, if
1092                  * supplied.
1093                  */
1094                 if (IsBinaryUpgrade &&
1095                         OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
1096                         (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1097                          relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1098                          relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE))
1099                 {
1100                         relid = binary_upgrade_next_heap_pg_class_oid;
1101                         binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1102                 }
1103                 else if (IsBinaryUpgrade &&
1104                                  OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1105                                  relkind == RELKIND_TOASTVALUE)
1106                 {
1107                         relid = binary_upgrade_next_toast_pg_class_oid;
1108                         binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1109                 }
1110                 else
1111                         relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1112                                                                           relpersistence);
1113         }
1114
1115         /*
1116          * Determine the relation's initial permissions.
1117          */
1118         if (use_user_acl)
1119         {
1120                 switch (relkind)
1121                 {
1122                         case RELKIND_RELATION:
1123                         case RELKIND_VIEW:
1124                         case RELKIND_MATVIEW:
1125                         case RELKIND_FOREIGN_TABLE:
1126                                 relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
1127                                                                                           relnamespace);
1128                                 break;
1129                         case RELKIND_SEQUENCE:
1130                                 relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
1131                                                                                           relnamespace);
1132                                 break;
1133                         default:
1134                                 relacl = NULL;
1135                                 break;
1136                 }
1137         }
1138         else
1139                 relacl = NULL;
1140
1141         /*
1142          * Create the relcache entry (mostly dummy at this point) and the physical
1143          * disk file.  (If we fail further down, it's the smgr's responsibility to
1144          * remove the disk file again.)
1145          */
1146         new_rel_desc = heap_create(relname,
1147                                                            relnamespace,
1148                                                            reltablespace,
1149                                                            relid,
1150                                                            InvalidOid,
1151                                                            tupdesc,
1152                                                            relkind,
1153                                                            relpersistence,
1154                                                            shared_relation,
1155                                                            mapped_relation,
1156                                                            allow_system_table_mods);
1157
1158         Assert(relid == RelationGetRelid(new_rel_desc));
1159
1160         /*
1161          * Decide whether to create an array type over the relation's rowtype. We
1162          * do not create any array types for system catalogs (ie, those made
1163          * during initdb). We do not create them where the use of a relation as
1164          * such is an implementation detail: toast tables, sequences and indexes.
1165          */
1166         if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1167                                                           relkind == RELKIND_VIEW ||
1168                                                           relkind == RELKIND_MATVIEW ||
1169                                                           relkind == RELKIND_FOREIGN_TABLE ||
1170                                                           relkind == RELKIND_COMPOSITE_TYPE))
1171                 new_array_oid = AssignTypeArrayOid();
1172
1173         /*
1174          * Since defining a relation also defines a complex type, we add a new
1175          * system type corresponding to the new relation.  The OID of the type can
1176          * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1177          * generate a new OID for it.
1178          *
1179          * NOTE: we could get a unique-index failure here, in case someone else is
1180          * creating the same type name in parallel but hadn't committed yet when
1181          * we checked for a duplicate name above.
1182          */
1183         new_type_oid = AddNewRelationType(relname,
1184                                                                           relnamespace,
1185                                                                           relid,
1186                                                                           relkind,
1187                                                                           ownerid,
1188                                                                           reltypeid,
1189                                                                           new_array_oid);
1190
1191         /*
1192          * Now make the array type if wanted.
1193          */
1194         if (OidIsValid(new_array_oid))
1195         {
1196                 char       *relarrayname;
1197
1198                 relarrayname = makeArrayTypeName(relname, relnamespace);
1199
1200                 TypeCreate(new_array_oid,               /* force the type's OID to this */
1201                                    relarrayname,        /* Array type name */
1202                                    relnamespace,        /* Same namespace as parent */
1203                                    InvalidOid,  /* Not composite, no relationOid */
1204                                    0,                   /* relkind, also N/A here */
1205                                    ownerid,             /* owner's ID */
1206                                    -1,                  /* Internal size (varlena) */
1207                                    TYPTYPE_BASE,        /* Not composite - typelem is */
1208                                    TYPCATEGORY_ARRAY,   /* type-category (array) */
1209                                    false,               /* array types are never preferred */
1210                                    DEFAULT_TYPDELIM,    /* default array delimiter */
1211                                    F_ARRAY_IN,  /* array input proc */
1212                                    F_ARRAY_OUT, /* array output proc */
1213                                    F_ARRAY_RECV,        /* array recv (bin) proc */
1214                                    F_ARRAY_SEND,        /* array send (bin) proc */
1215                                    InvalidOid,  /* typmodin procedure - none */
1216                                    InvalidOid,  /* typmodout procedure - none */
1217                                    F_ARRAY_TYPANALYZE,  /* array analyze procedure */
1218                                    new_type_oid,        /* array element type - the rowtype */
1219                                    true,                /* yes, this is an array type */
1220                                    InvalidOid,  /* this has no array type */
1221                                    InvalidOid,  /* domain base type - irrelevant */
1222                                    NULL,                /* default value - none */
1223                                    NULL,                /* default binary representation */
1224                                    false,               /* passed by reference */
1225                                    'd',                 /* alignment - must be the largest! */
1226                                    'x',                 /* fully TOASTable */
1227                                    -1,                  /* typmod */
1228                                    0,                   /* array dimensions for typBaseType */
1229                                    false,               /* Type NOT NULL */
1230                                    InvalidOid); /* rowtypes never have a collation */
1231
1232                 pfree(relarrayname);
1233         }
1234
1235         /*
1236          * now create an entry in pg_class for the relation.
1237          *
1238          * NOTE: we could get a unique-index failure here, in case someone else is
1239          * creating the same relation name in parallel but hadn't committed yet
1240          * when we checked for a duplicate name above.
1241          */
1242         AddNewRelationTuple(pg_class_desc,
1243                                                 new_rel_desc,
1244                                                 relid,
1245                                                 new_type_oid,
1246                                                 reloftypeid,
1247                                                 ownerid,
1248                                                 relkind,
1249                                                 PointerGetDatum(relacl),
1250                                                 reloptions);
1251
1252         /*
1253          * now add tuples to pg_attribute for the attributes in our new relation.
1254          */
1255         AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
1256                                                   oidislocal, oidinhcount);
1257
1258         /*
1259          * Make a dependency link to force the relation to be deleted if its
1260          * namespace is.  Also make a dependency link to its owner, as well as
1261          * dependencies for any roles mentioned in the default ACL.
1262          *
1263          * For composite types, these dependencies are tracked for the pg_type
1264          * entry, so we needn't record them here.  Likewise, TOAST tables don't
1265          * need a namespace dependency (they live in a pinned namespace) nor an
1266          * owner dependency (they depend indirectly through the parent table), nor
1267          * should they have any ACL entries.  The same applies for extension
1268          * dependencies.
1269          *
1270          * If it's a temp table, we do not make it an extension member; this
1271          * prevents the unintuitive result that deletion of the temp table at
1272          * session end would make the whole extension go away.
1273          *
1274          * Also, skip this in bootstrap mode, since we don't make dependencies
1275          * while bootstrapping.
1276          */
1277         if (relkind != RELKIND_COMPOSITE_TYPE &&
1278                 relkind != RELKIND_TOASTVALUE &&
1279                 !IsBootstrapProcessingMode())
1280         {
1281                 ObjectAddress myself,
1282                                         referenced;
1283
1284                 myself.classId = RelationRelationId;
1285                 myself.objectId = relid;
1286                 myself.objectSubId = 0;
1287                 referenced.classId = NamespaceRelationId;
1288                 referenced.objectId = relnamespace;
1289                 referenced.objectSubId = 0;
1290                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1291
1292                 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1293
1294                 if (relpersistence != RELPERSISTENCE_TEMP)
1295                         recordDependencyOnCurrentExtension(&myself, false);
1296
1297                 if (reloftypeid)
1298                 {
1299                         referenced.classId = TypeRelationId;
1300                         referenced.objectId = reloftypeid;
1301                         referenced.objectSubId = 0;
1302                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1303                 }
1304
1305                 if (relacl != NULL)
1306                 {
1307                         int                     nnewmembers;
1308                         Oid                *newmembers;
1309
1310                         nnewmembers = aclmembers(relacl, &newmembers);
1311                         updateAclDependencies(RelationRelationId, relid, 0,
1312                                                                   ownerid,
1313                                                                   0, NULL,
1314                                                                   nnewmembers, newmembers);
1315                 }
1316         }
1317
1318         /* Post creation hook for new relation */
1319         InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1320
1321         /*
1322          * Store any supplied constraints and defaults.
1323          *
1324          * NB: this may do a CommandCounterIncrement and rebuild the relcache
1325          * entry, so the relation must be valid and self-consistent at this point.
1326          * In particular, there are not yet constraints and defaults anywhere.
1327          */
1328         StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1329
1330         /*
1331          * If there's a special on-commit action, remember it
1332          */
1333         if (oncommit != ONCOMMIT_NOOP)
1334                 register_on_commit_action(relid, oncommit);
1335
1336         if (relpersistence == RELPERSISTENCE_UNLOGGED)
1337         {
1338                 Assert(relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW ||
1339                            relkind == RELKIND_TOASTVALUE);
1340                 heap_create_init_fork(new_rel_desc);
1341         }
1342
1343         /*
1344          * ok, the relation has been cataloged, so close our relations and return
1345          * the OID of the newly created relation.
1346          */
1347         heap_close(new_rel_desc, NoLock);       /* do not unlock till end of xact */
1348         heap_close(pg_class_desc, RowExclusiveLock);
1349
1350         return relid;
1351 }
1352
1353 /*
1354  * Set up an init fork for an unlogged table so that it can be correctly
1355  * reinitialized on restart.  Since we're going to do an immediate sync, we
1356  * only need to xlog this if archiving or streaming is enabled.  And the
1357  * immediate sync is required, because otherwise there's no guarantee that
1358  * this will hit the disk before the next checkpoint moves the redo pointer.
1359  */
1360 void
1361 heap_create_init_fork(Relation rel)
1362 {
1363         RelationOpenSmgr(rel);
1364         smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
1365         if (XLogIsNeeded())
1366                 log_smgrcreate(&rel->rd_smgr->smgr_rnode.node, INIT_FORKNUM);
1367         smgrimmedsync(rel->rd_smgr, INIT_FORKNUM);
1368 }
1369
1370 /*
1371  *              RelationRemoveInheritance
1372  *
1373  * Formerly, this routine checked for child relations and aborted the
1374  * deletion if any were found.  Now we rely on the dependency mechanism
1375  * to check for or delete child relations.      By the time we get here,
1376  * there are no children and we need only remove any pg_inherits rows
1377  * linking this relation to its parent(s).
1378  */
1379 static void
1380 RelationRemoveInheritance(Oid relid)
1381 {
1382         Relation        catalogRelation;
1383         SysScanDesc scan;
1384         ScanKeyData key;
1385         HeapTuple       tuple;
1386
1387         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
1388
1389         ScanKeyInit(&key,
1390                                 Anum_pg_inherits_inhrelid,
1391                                 BTEqualStrategyNumber, F_OIDEQ,
1392                                 ObjectIdGetDatum(relid));
1393
1394         scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1395                                                           NULL, 1, &key);
1396
1397         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1398                 simple_heap_delete(catalogRelation, &tuple->t_self);
1399
1400         systable_endscan(scan);
1401         heap_close(catalogRelation, RowExclusiveLock);
1402 }
1403
1404 /*
1405  *              DeleteRelationTuple
1406  *
1407  * Remove pg_class row for the given relid.
1408  *
1409  * Note: this is shared by relation deletion and index deletion.  It's
1410  * not intended for use anyplace else.
1411  */
1412 void
1413 DeleteRelationTuple(Oid relid)
1414 {
1415         Relation        pg_class_desc;
1416         HeapTuple       tup;
1417
1418         /* Grab an appropriate lock on the pg_class relation */
1419         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1420
1421         tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1422         if (!HeapTupleIsValid(tup))
1423                 elog(ERROR, "cache lookup failed for relation %u", relid);
1424
1425         /* delete the relation tuple from pg_class, and finish up */
1426         simple_heap_delete(pg_class_desc, &tup->t_self);
1427
1428         ReleaseSysCache(tup);
1429
1430         heap_close(pg_class_desc, RowExclusiveLock);
1431 }
1432
1433 /*
1434  *              DeleteAttributeTuples
1435  *
1436  * Remove pg_attribute rows for the given relid.
1437  *
1438  * Note: this is shared by relation deletion and index deletion.  It's
1439  * not intended for use anyplace else.
1440  */
1441 void
1442 DeleteAttributeTuples(Oid relid)
1443 {
1444         Relation        attrel;
1445         SysScanDesc scan;
1446         ScanKeyData key[1];
1447         HeapTuple       atttup;
1448
1449         /* Grab an appropriate lock on the pg_attribute relation */
1450         attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1451
1452         /* Use the index to scan only attributes of the target relation */
1453         ScanKeyInit(&key[0],
1454                                 Anum_pg_attribute_attrelid,
1455                                 BTEqualStrategyNumber, F_OIDEQ,
1456                                 ObjectIdGetDatum(relid));
1457
1458         scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1459                                                           NULL, 1, key);
1460
1461         /* Delete all the matching tuples */
1462         while ((atttup = systable_getnext(scan)) != NULL)
1463                 simple_heap_delete(attrel, &atttup->t_self);
1464
1465         /* Clean up after the scan */
1466         systable_endscan(scan);
1467         heap_close(attrel, RowExclusiveLock);
1468 }
1469
1470 /*
1471  *              DeleteSystemAttributeTuples
1472  *
1473  * Remove pg_attribute rows for system columns of the given relid.
1474  *
1475  * Note: this is only used when converting a table to a view.  Views don't
1476  * have system columns, so we should remove them from pg_attribute.
1477  */
1478 void
1479 DeleteSystemAttributeTuples(Oid relid)
1480 {
1481         Relation        attrel;
1482         SysScanDesc scan;
1483         ScanKeyData key[2];
1484         HeapTuple       atttup;
1485
1486         /* Grab an appropriate lock on the pg_attribute relation */
1487         attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1488
1489         /* Use the index to scan only system attributes of the target relation */
1490         ScanKeyInit(&key[0],
1491                                 Anum_pg_attribute_attrelid,
1492                                 BTEqualStrategyNumber, F_OIDEQ,
1493                                 ObjectIdGetDatum(relid));
1494         ScanKeyInit(&key[1],
1495                                 Anum_pg_attribute_attnum,
1496                                 BTLessEqualStrategyNumber, F_INT2LE,
1497                                 Int16GetDatum(0));
1498
1499         scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1500                                                           NULL, 2, key);
1501
1502         /* Delete all the matching tuples */
1503         while ((atttup = systable_getnext(scan)) != NULL)
1504                 simple_heap_delete(attrel, &atttup->t_self);
1505
1506         /* Clean up after the scan */
1507         systable_endscan(scan);
1508         heap_close(attrel, RowExclusiveLock);
1509 }
1510
1511 /*
1512  *              RemoveAttributeById
1513  *
1514  * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1515  * deleted in pg_attribute.  We also remove pg_statistic entries for it.
1516  * (Everything else needed, such as getting rid of any pg_attrdef entry,
1517  * is handled by dependency.c.)
1518  */
1519 void
1520 RemoveAttributeById(Oid relid, AttrNumber attnum)
1521 {
1522         Relation        rel;
1523         Relation        attr_rel;
1524         HeapTuple       tuple;
1525         Form_pg_attribute attStruct;
1526         char            newattname[NAMEDATALEN];
1527
1528         /*
1529          * Grab an exclusive lock on the target table, which we will NOT release
1530          * until end of transaction.  (In the simple case where we are directly
1531          * dropping this column, AlterTableDropColumn already did this ... but
1532          * when cascading from a drop of some other object, we may not have any
1533          * lock.)
1534          */
1535         rel = relation_open(relid, AccessExclusiveLock);
1536
1537         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1538
1539         tuple = SearchSysCacheCopy2(ATTNUM,
1540                                                                 ObjectIdGetDatum(relid),
1541                                                                 Int16GetDatum(attnum));
1542         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1543                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1544                          attnum, relid);
1545         attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1546
1547         if (attnum < 0)
1548         {
1549                 /* System attribute (probably OID) ... just delete the row */
1550
1551                 simple_heap_delete(attr_rel, &tuple->t_self);
1552         }
1553         else
1554         {
1555                 /* Dropping user attributes is lots harder */
1556
1557                 /* Mark the attribute as dropped */
1558                 attStruct->attisdropped = true;
1559
1560                 /*
1561                  * Set the type OID to invalid.  A dropped attribute's type link
1562                  * cannot be relied on (once the attribute is dropped, the type might
1563                  * be too). Fortunately we do not need the type row --- the only
1564                  * really essential information is the type's typlen and typalign,
1565                  * which are preserved in the attribute's attlen and attalign.  We set
1566                  * atttypid to zero here as a means of catching code that incorrectly
1567                  * expects it to be valid.
1568                  */
1569                 attStruct->atttypid = InvalidOid;
1570
1571                 /* Remove any NOT NULL constraint the column may have */
1572                 attStruct->attnotnull = false;
1573
1574                 /* We don't want to keep stats for it anymore */
1575                 attStruct->attstattarget = 0;
1576
1577                 /*
1578                  * Change the column name to something that isn't likely to conflict
1579                  */
1580                 snprintf(newattname, sizeof(newattname),
1581                                  "........pg.dropped.%d........", attnum);
1582                 namestrcpy(&(attStruct->attname), newattname);
1583
1584                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
1585
1586                 /* keep the system catalog indexes current */
1587                 CatalogUpdateIndexes(attr_rel, tuple);
1588         }
1589
1590         /*
1591          * Because updating the pg_attribute row will trigger a relcache flush for
1592          * the target relation, we need not do anything else to notify other
1593          * backends of the change.
1594          */
1595
1596         heap_close(attr_rel, RowExclusiveLock);
1597
1598         if (attnum > 0)
1599                 RemoveStatistics(relid, attnum);
1600
1601         relation_close(rel, NoLock);
1602 }
1603
1604 /*
1605  *              RemoveAttrDefault
1606  *
1607  * If the specified relation/attribute has a default, remove it.
1608  * (If no default, raise error if complain is true, else return quietly.)
1609  */
1610 void
1611 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1612                                   DropBehavior behavior, bool complain, bool internal)
1613 {
1614         Relation        attrdef_rel;
1615         ScanKeyData scankeys[2];
1616         SysScanDesc scan;
1617         HeapTuple       tuple;
1618         bool            found = false;
1619
1620         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1621
1622         ScanKeyInit(&scankeys[0],
1623                                 Anum_pg_attrdef_adrelid,
1624                                 BTEqualStrategyNumber, F_OIDEQ,
1625                                 ObjectIdGetDatum(relid));
1626         ScanKeyInit(&scankeys[1],
1627                                 Anum_pg_attrdef_adnum,
1628                                 BTEqualStrategyNumber, F_INT2EQ,
1629                                 Int16GetDatum(attnum));
1630
1631         scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1632                                                           NULL, 2, scankeys);
1633
1634         /* There should be at most one matching tuple, but we loop anyway */
1635         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1636         {
1637                 ObjectAddress object;
1638
1639                 object.classId = AttrDefaultRelationId;
1640                 object.objectId = HeapTupleGetOid(tuple);
1641                 object.objectSubId = 0;
1642
1643                 performDeletion(&object, behavior,
1644                                                 internal ? PERFORM_DELETION_INTERNAL : 0);
1645
1646                 found = true;
1647         }
1648
1649         systable_endscan(scan);
1650         heap_close(attrdef_rel, RowExclusiveLock);
1651
1652         if (complain && !found)
1653                 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1654                          relid, attnum);
1655 }
1656
1657 /*
1658  *              RemoveAttrDefaultById
1659  *
1660  * Remove a pg_attrdef entry specified by OID.  This is the guts of
1661  * attribute-default removal.  Note it should be called via performDeletion,
1662  * not directly.
1663  */
1664 void
1665 RemoveAttrDefaultById(Oid attrdefId)
1666 {
1667         Relation        attrdef_rel;
1668         Relation        attr_rel;
1669         Relation        myrel;
1670         ScanKeyData scankeys[1];
1671         SysScanDesc scan;
1672         HeapTuple       tuple;
1673         Oid                     myrelid;
1674         AttrNumber      myattnum;
1675
1676         /* Grab an appropriate lock on the pg_attrdef relation */
1677         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1678
1679         /* Find the pg_attrdef tuple */
1680         ScanKeyInit(&scankeys[0],
1681                                 ObjectIdAttributeNumber,
1682                                 BTEqualStrategyNumber, F_OIDEQ,
1683                                 ObjectIdGetDatum(attrdefId));
1684
1685         scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1686                                                           NULL, 1, scankeys);
1687
1688         tuple = systable_getnext(scan);
1689         if (!HeapTupleIsValid(tuple))
1690                 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1691
1692         myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1693         myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1694
1695         /* Get an exclusive lock on the relation owning the attribute */
1696         myrel = relation_open(myrelid, AccessExclusiveLock);
1697
1698         /* Now we can delete the pg_attrdef row */
1699         simple_heap_delete(attrdef_rel, &tuple->t_self);
1700
1701         systable_endscan(scan);
1702         heap_close(attrdef_rel, RowExclusiveLock);
1703
1704         /* Fix the pg_attribute row */
1705         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1706
1707         tuple = SearchSysCacheCopy2(ATTNUM,
1708                                                                 ObjectIdGetDatum(myrelid),
1709                                                                 Int16GetDatum(myattnum));
1710         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1711                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1712                          myattnum, myrelid);
1713
1714         ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1715
1716         simple_heap_update(attr_rel, &tuple->t_self, tuple);
1717
1718         /* keep the system catalog indexes current */
1719         CatalogUpdateIndexes(attr_rel, tuple);
1720
1721         /*
1722          * Our update of the pg_attribute row will force a relcache rebuild, so
1723          * there's nothing else to do here.
1724          */
1725         heap_close(attr_rel, RowExclusiveLock);
1726
1727         /* Keep lock on attribute's rel until end of xact */
1728         relation_close(myrel, NoLock);
1729 }
1730
1731 /*
1732  * heap_drop_with_catalog       - removes specified relation from catalogs
1733  *
1734  * Note that this routine is not responsible for dropping objects that are
1735  * linked to the pg_class entry via dependencies (for example, indexes and
1736  * constraints).  Those are deleted by the dependency-tracing logic in
1737  * dependency.c before control gets here.  In general, therefore, this routine
1738  * should never be called directly; go through performDeletion() instead.
1739  */
1740 void
1741 heap_drop_with_catalog(Oid relid)
1742 {
1743         Relation        rel;
1744
1745         /*
1746          * Open and lock the relation.
1747          */
1748         rel = relation_open(relid, AccessExclusiveLock);
1749
1750         /*
1751          * There can no longer be anyone *else* touching the relation, but we
1752          * might still have open queries or cursors, or pending trigger events, in
1753          * our own session.
1754          */
1755         CheckTableNotInUse(rel, "DROP TABLE");
1756
1757         /*
1758          * This effectively deletes all rows in the table, and may be done in a
1759          * serializable transaction.  In that case we must record a rw-conflict in
1760          * to this transaction from each transaction holding a predicate lock on
1761          * the table.
1762          */
1763         CheckTableForSerializableConflictIn(rel);
1764
1765         /*
1766          * Delete pg_foreign_table tuple first.
1767          */
1768         if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1769         {
1770                 Relation        rel;
1771                 HeapTuple       tuple;
1772
1773                 rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
1774
1775                 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1776                 if (!HeapTupleIsValid(tuple))
1777                         elog(ERROR, "cache lookup failed for foreign table %u", relid);
1778
1779                 simple_heap_delete(rel, &tuple->t_self);
1780
1781                 ReleaseSysCache(tuple);
1782                 heap_close(rel, RowExclusiveLock);
1783         }
1784
1785         /*
1786          * Schedule unlinking of the relation's physical files at commit.
1787          */
1788         if (rel->rd_rel->relkind != RELKIND_VIEW &&
1789                 rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1790                 rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
1791         {
1792                 RelationDropStorage(rel);
1793         }
1794
1795         /*
1796          * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1797          * until transaction commit.  This ensures no one else will try to do
1798          * something with the doomed relation.
1799          */
1800         relation_close(rel, NoLock);
1801
1802         /*
1803          * Forget any ON COMMIT action for the rel
1804          */
1805         remove_on_commit_action(relid);
1806
1807         /*
1808          * Flush the relation from the relcache.  We want to do this before
1809          * starting to remove catalog entries, just to be certain that no relcache
1810          * entry rebuild will happen partway through.  (That should not really
1811          * matter, since we don't do CommandCounterIncrement here, but let's be
1812          * safe.)
1813          */
1814         RelationForgetRelation(relid);
1815
1816         /*
1817          * remove inheritance information
1818          */
1819         RelationRemoveInheritance(relid);
1820
1821         /*
1822          * delete statistics
1823          */
1824         RemoveStatistics(relid, 0);
1825
1826         /*
1827          * delete attribute tuples
1828          */
1829         DeleteAttributeTuples(relid);
1830
1831         /*
1832          * delete relation tuple
1833          */
1834         DeleteRelationTuple(relid);
1835 }
1836
1837
1838 /*
1839  * Store a default expression for column attnum of relation rel.
1840  */
1841 void
1842 StoreAttrDefault(Relation rel, AttrNumber attnum,
1843                                  Node *expr, bool is_internal)
1844 {
1845         char       *adbin;
1846         char       *adsrc;
1847         Relation        adrel;
1848         HeapTuple       tuple;
1849         Datum           values[4];
1850         static bool nulls[4] = {false, false, false, false};
1851         Relation        attrrel;
1852         HeapTuple       atttup;
1853         Form_pg_attribute attStruct;
1854         Oid                     attrdefOid;
1855         ObjectAddress colobject,
1856                                 defobject;
1857
1858         /*
1859          * Flatten expression to string form for storage.
1860          */
1861         adbin = nodeToString(expr);
1862
1863         /*
1864          * Also deparse it to form the mostly-obsolete adsrc field.
1865          */
1866         adsrc = deparse_expression(expr,
1867                                                         deparse_context_for(RelationGetRelationName(rel),
1868                                                                                                 RelationGetRelid(rel)),
1869                                                            false, false);
1870
1871         /*
1872          * Make the pg_attrdef entry.
1873          */
1874         values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
1875         values[Anum_pg_attrdef_adnum - 1] = attnum;
1876         values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
1877         values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
1878
1879         adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1880
1881         tuple = heap_form_tuple(adrel->rd_att, values, nulls);
1882         attrdefOid = simple_heap_insert(adrel, tuple);
1883
1884         CatalogUpdateIndexes(adrel, tuple);
1885
1886         defobject.classId = AttrDefaultRelationId;
1887         defobject.objectId = attrdefOid;
1888         defobject.objectSubId = 0;
1889
1890         heap_close(adrel, RowExclusiveLock);
1891
1892         /* now can free some of the stuff allocated above */
1893         pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
1894         pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
1895         heap_freetuple(tuple);
1896         pfree(adbin);
1897         pfree(adsrc);
1898
1899         /*
1900          * Update the pg_attribute entry for the column to show that a default
1901          * exists.
1902          */
1903         attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
1904         atttup = SearchSysCacheCopy2(ATTNUM,
1905                                                                  ObjectIdGetDatum(RelationGetRelid(rel)),
1906                                                                  Int16GetDatum(attnum));
1907         if (!HeapTupleIsValid(atttup))
1908                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1909                          attnum, RelationGetRelid(rel));
1910         attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
1911         if (!attStruct->atthasdef)
1912         {
1913                 attStruct->atthasdef = true;
1914                 simple_heap_update(attrrel, &atttup->t_self, atttup);
1915                 /* keep catalog indexes current */
1916                 CatalogUpdateIndexes(attrrel, atttup);
1917         }
1918         heap_close(attrrel, RowExclusiveLock);
1919         heap_freetuple(atttup);
1920
1921         /*
1922          * Make a dependency so that the pg_attrdef entry goes away if the column
1923          * (or whole table) is deleted.
1924          */
1925         colobject.classId = RelationRelationId;
1926         colobject.objectId = RelationGetRelid(rel);
1927         colobject.objectSubId = attnum;
1928
1929         recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
1930
1931         /*
1932          * Record dependencies on objects used in the expression, too.
1933          */
1934         recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
1935
1936         /*
1937          * Post creation hook for attribute defaults.
1938          *
1939          * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
1940          * couple of deletion/creation of the attribute's default entry, so the
1941          * callee should check existence of an older version of this entry if it
1942          * needs to distinguish.
1943          */
1944         InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
1945                                                                   RelationGetRelid(rel), attnum, is_internal);
1946 }
1947
1948 /*
1949  * Store a check-constraint expression for the given relation.
1950  *
1951  * Caller is responsible for updating the count of constraints
1952  * in the pg_class entry for the relation.
1953  */
1954 static void
1955 StoreRelCheck(Relation rel, char *ccname, Node *expr,
1956                           bool is_validated, bool is_local, int inhcount,
1957                           bool is_no_inherit, bool is_internal)
1958 {
1959         char       *ccbin;
1960         char       *ccsrc;
1961         List       *varList;
1962         int                     keycount;
1963         int16      *attNos;
1964
1965         /*
1966          * Flatten expression to string form for storage.
1967          */
1968         ccbin = nodeToString(expr);
1969
1970         /*
1971          * Also deparse it to form the mostly-obsolete consrc field.
1972          */
1973         ccsrc = deparse_expression(expr,
1974                                                         deparse_context_for(RelationGetRelationName(rel),
1975                                                                                                 RelationGetRelid(rel)),
1976                                                            false, false);
1977
1978         /*
1979          * Find columns of rel that are used in expr
1980          *
1981          * NB: pull_var_clause is okay here only because we don't allow subselects
1982          * in check constraints; it would fail to examine the contents of
1983          * subselects.
1984          */
1985         varList = pull_var_clause(expr,
1986                                                           PVC_REJECT_AGGREGATES,
1987                                                           PVC_REJECT_PLACEHOLDERS);
1988         keycount = list_length(varList);
1989
1990         if (keycount > 0)
1991         {
1992                 ListCell   *vl;
1993                 int                     i = 0;
1994
1995                 attNos = (int16 *) palloc(keycount * sizeof(int16));
1996                 foreach(vl, varList)
1997                 {
1998                         Var                *var = (Var *) lfirst(vl);
1999                         int                     j;
2000
2001                         for (j = 0; j < i; j++)
2002                                 if (attNos[j] == var->varattno)
2003                                         break;
2004                         if (j == i)
2005                                 attNos[i++] = var->varattno;
2006                 }
2007                 keycount = i;
2008         }
2009         else
2010                 attNos = NULL;
2011
2012         /*
2013          * Create the Check Constraint
2014          */
2015         CreateConstraintEntry(ccname,           /* Constraint Name */
2016                                                   RelationGetNamespace(rel),    /* namespace */
2017                                                   CONSTRAINT_CHECK,             /* Constraint Type */
2018                                                   false,        /* Is Deferrable */
2019                                                   false,        /* Is Deferred */
2020                                                   is_validated,
2021                                                   RelationGetRelid(rel),                /* relation */
2022                                                   attNos,               /* attrs in the constraint */
2023                                                   keycount,             /* # attrs in the constraint */
2024                                                   InvalidOid,   /* not a domain constraint */
2025                                                   InvalidOid,   /* no associated index */
2026                                                   InvalidOid,   /* Foreign key fields */
2027                                                   NULL,
2028                                                   NULL,
2029                                                   NULL,
2030                                                   NULL,
2031                                                   0,
2032                                                   ' ',
2033                                                   ' ',
2034                                                   ' ',
2035                                                   NULL, /* not an exclusion constraint */
2036                                                   expr, /* Tree form of check constraint */
2037                                                   ccbin,        /* Binary form of check constraint */
2038                                                   ccsrc,        /* Source form of check constraint */
2039                                                   is_local,             /* conislocal */
2040                                                   inhcount,             /* coninhcount */
2041                                                   is_no_inherit,                /* connoinherit */
2042                                                   is_internal); /* internally constructed? */
2043
2044         pfree(ccbin);
2045         pfree(ccsrc);
2046 }
2047
2048 /*
2049  * Store defaults and constraints (passed as a list of CookedConstraint).
2050  *
2051  * NOTE: only pre-cooked expressions will be passed this way, which is to
2052  * say expressions inherited from an existing relation.  Newly parsed
2053  * expressions can be added later, by direct calls to StoreAttrDefault
2054  * and StoreRelCheck (see AddRelationNewConstraints()).
2055  */
2056 static void
2057 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2058 {
2059         int                     numchecks = 0;
2060         ListCell   *lc;
2061
2062         if (!cooked_constraints)
2063                 return;                                 /* nothing to do */
2064
2065         /*
2066          * Deparsing of constraint expressions will fail unless the just-created
2067          * pg_attribute tuples for this relation are made visible.      So, bump the
2068          * command counter.  CAUTION: this will cause a relcache entry rebuild.
2069          */
2070         CommandCounterIncrement();
2071
2072         foreach(lc, cooked_constraints)
2073         {
2074                 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2075
2076                 switch (con->contype)
2077                 {
2078                         case CONSTR_DEFAULT:
2079                                 StoreAttrDefault(rel, con->attnum, con->expr, is_internal);
2080                                 break;
2081                         case CONSTR_CHECK:
2082                                 StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
2083                                                           con->is_local, con->inhcount,
2084                                                           con->is_no_inherit, is_internal);
2085                                 numchecks++;
2086                                 break;
2087                         default:
2088                                 elog(ERROR, "unrecognized constraint type: %d",
2089                                          (int) con->contype);
2090                 }
2091         }
2092
2093         if (numchecks > 0)
2094                 SetRelationNumChecks(rel, numchecks);
2095 }
2096
2097 /*
2098  * AddRelationNewConstraints
2099  *
2100  * Add new column default expressions and/or constraint check expressions
2101  * to an existing relation.  This is defined to do both for efficiency in
2102  * DefineRelation, but of course you can do just one or the other by passing
2103  * empty lists.
2104  *
2105  * rel: relation to be modified
2106  * newColDefaults: list of RawColumnDefault structures
2107  * newConstraints: list of Constraint nodes
2108  * allow_merge: TRUE if check constraints may be merged with existing ones
2109  * is_local: TRUE if definition is local, FALSE if it's inherited
2110  * is_internal: TRUE if result of some internal process, not a user request
2111  *
2112  * All entries in newColDefaults will be processed.  Entries in newConstraints
2113  * will be processed only if they are CONSTR_CHECK type.
2114  *
2115  * Returns a list of CookedConstraint nodes that shows the cooked form of
2116  * the default and constraint expressions added to the relation.
2117  *
2118  * NB: caller should have opened rel with AccessExclusiveLock, and should
2119  * hold that lock till end of transaction.      Also, we assume the caller has
2120  * done a CommandCounterIncrement if necessary to make the relation's catalog
2121  * tuples visible.
2122  */
2123 List *
2124 AddRelationNewConstraints(Relation rel,
2125                                                   List *newColDefaults,
2126                                                   List *newConstraints,
2127                                                   bool allow_merge,
2128                                                   bool is_local,
2129                                                   bool is_internal)
2130 {
2131         List       *cookedConstraints = NIL;
2132         TupleDesc       tupleDesc;
2133         TupleConstr *oldconstr;
2134         int                     numoldchecks;
2135         ParseState *pstate;
2136         RangeTblEntry *rte;
2137         int                     numchecks;
2138         List       *checknames;
2139         ListCell   *cell;
2140         Node       *expr;
2141         CookedConstraint *cooked;
2142
2143         /*
2144          * Get info about existing constraints.
2145          */
2146         tupleDesc = RelationGetDescr(rel);
2147         oldconstr = tupleDesc->constr;
2148         if (oldconstr)
2149                 numoldchecks = oldconstr->num_check;
2150         else
2151                 numoldchecks = 0;
2152
2153         /*
2154          * Create a dummy ParseState and insert the target relation as its sole
2155          * rangetable entry.  We need a ParseState for transformExpr.
2156          */
2157         pstate = make_parsestate(NULL);
2158         rte = addRangeTableEntryForRelation(pstate,
2159                                                                                 rel,
2160                                                                                 NULL,
2161                                                                                 false,
2162                                                                                 true);
2163         addRTEtoQuery(pstate, rte, true, true, true);
2164
2165         /*
2166          * Process column default expressions.
2167          */
2168         foreach(cell, newColDefaults)
2169         {
2170                 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2171                 Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
2172
2173                 expr = cookDefault(pstate, colDef->raw_default,
2174                                                    atp->atttypid, atp->atttypmod,
2175                                                    NameStr(atp->attname));
2176
2177                 /*
2178                  * If the expression is just a NULL constant, we do not bother to make
2179                  * an explicit pg_attrdef entry, since the default behavior is
2180                  * equivalent.
2181                  *
2182                  * Note a nonobvious property of this test: if the column is of a
2183                  * domain type, what we'll get is not a bare null Const but a
2184                  * CoerceToDomain expr, so we will not discard the default.  This is
2185                  * critical because the column default needs to be retained to
2186                  * override any default that the domain might have.
2187                  */
2188                 if (expr == NULL ||
2189                         (IsA(expr, Const) &&((Const *) expr)->constisnull))
2190                         continue;
2191
2192                 StoreAttrDefault(rel, colDef->attnum, expr, is_internal);
2193
2194                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2195                 cooked->contype = CONSTR_DEFAULT;
2196                 cooked->name = NULL;
2197                 cooked->attnum = colDef->attnum;
2198                 cooked->expr = expr;
2199                 cooked->skip_validation = false;
2200                 cooked->is_local = is_local;
2201                 cooked->inhcount = is_local ? 0 : 1;
2202                 cooked->is_no_inherit = false;
2203                 cookedConstraints = lappend(cookedConstraints, cooked);
2204         }
2205
2206         /*
2207          * Process constraint expressions.
2208          */
2209         numchecks = numoldchecks;
2210         checknames = NIL;
2211         foreach(cell, newConstraints)
2212         {
2213                 Constraint *cdef = (Constraint *) lfirst(cell);
2214                 char       *ccname;
2215
2216                 if (cdef->contype != CONSTR_CHECK)
2217                         continue;
2218
2219                 if (cdef->raw_expr != NULL)
2220                 {
2221                         Assert(cdef->cooked_expr == NULL);
2222
2223                         /*
2224                          * Transform raw parsetree to executable expression, and verify
2225                          * it's valid as a CHECK constraint.
2226                          */
2227                         expr = cookConstraint(pstate, cdef->raw_expr,
2228                                                                   RelationGetRelationName(rel));
2229                 }
2230                 else
2231                 {
2232                         Assert(cdef->cooked_expr != NULL);
2233
2234                         /*
2235                          * Here, we assume the parser will only pass us valid CHECK
2236                          * expressions, so we do no particular checking.
2237                          */
2238                         expr = stringToNode(cdef->cooked_expr);
2239                 }
2240
2241                 /*
2242                  * Check name uniqueness, or generate a name if none was given.
2243                  */
2244                 if (cdef->conname != NULL)
2245                 {
2246                         ListCell   *cell2;
2247
2248                         ccname = cdef->conname;
2249                         /* Check against other new constraints */
2250                         /* Needed because we don't do CommandCounterIncrement in loop */
2251                         foreach(cell2, checknames)
2252                         {
2253                                 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2254                                         ereport(ERROR,
2255                                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
2256                                                          errmsg("check constraint \"%s\" already exists",
2257                                                                         ccname)));
2258                         }
2259
2260                         /* save name for future checks */
2261                         checknames = lappend(checknames, ccname);
2262
2263                         /*
2264                          * Check against pre-existing constraints.      If we are allowed to
2265                          * merge with an existing constraint, there's no more to do here.
2266                          * (We omit the duplicate constraint from the result, which is
2267                          * what ATAddCheckConstraint wants.)
2268                          */
2269                         if (MergeWithExistingConstraint(rel, ccname, expr,
2270                                                                                         allow_merge, is_local,
2271                                                                                         cdef->is_no_inherit))
2272                                 continue;
2273                 }
2274                 else
2275                 {
2276                         /*
2277                          * When generating a name, we want to create "tab_col_check" for a
2278                          * column constraint and "tab_check" for a table constraint.  We
2279                          * no longer have any info about the syntactic positioning of the
2280                          * constraint phrase, so we approximate this by seeing whether the
2281                          * expression references more than one column.  (If the user
2282                          * played by the rules, the result is the same...)
2283                          *
2284                          * Note: pull_var_clause() doesn't descend into sublinks, but we
2285                          * eliminated those above; and anyway this only needs to be an
2286                          * approximate answer.
2287                          */
2288                         List       *vars;
2289                         char       *colname;
2290
2291                         vars = pull_var_clause(expr,
2292                                                                    PVC_REJECT_AGGREGATES,
2293                                                                    PVC_REJECT_PLACEHOLDERS);
2294
2295                         /* eliminate duplicates */
2296                         vars = list_union(NIL, vars);
2297
2298                         if (list_length(vars) == 1)
2299                                 colname = get_attname(RelationGetRelid(rel),
2300                                                                           ((Var *) linitial(vars))->varattno);
2301                         else
2302                                 colname = NULL;
2303
2304                         ccname = ChooseConstraintName(RelationGetRelationName(rel),
2305                                                                                   colname,
2306                                                                                   "check",
2307                                                                                   RelationGetNamespace(rel),
2308                                                                                   checknames);
2309
2310                         /* save name for future checks */
2311                         checknames = lappend(checknames, ccname);
2312                 }
2313
2314                 /*
2315                  * OK, store it.
2316                  */
2317                 StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
2318                                           is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2319
2320                 numchecks++;
2321
2322                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2323                 cooked->contype = CONSTR_CHECK;
2324                 cooked->name = ccname;
2325                 cooked->attnum = 0;
2326                 cooked->expr = expr;
2327                 cooked->skip_validation = cdef->skip_validation;
2328                 cooked->is_local = is_local;
2329                 cooked->inhcount = is_local ? 0 : 1;
2330                 cooked->is_no_inherit = cdef->is_no_inherit;
2331                 cookedConstraints = lappend(cookedConstraints, cooked);
2332         }
2333
2334         /*
2335          * Update the count of constraints in the relation's pg_class tuple. We do
2336          * this even if there was no change, in order to ensure that an SI update
2337          * message is sent out for the pg_class tuple, which will force other
2338          * backends to rebuild their relcache entries for the rel. (This is
2339          * critical if we added defaults but not constraints.)
2340          */
2341         SetRelationNumChecks(rel, numchecks);
2342
2343         return cookedConstraints;
2344 }
2345
2346 /*
2347  * Check for a pre-existing check constraint that conflicts with a proposed
2348  * new one, and either adjust its conislocal/coninhcount settings or throw
2349  * error as needed.
2350  *
2351  * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
2352  * got a so-far-unique name, or throws error if conflict.
2353  *
2354  * XXX See MergeConstraintsIntoExisting too if you change this code.
2355  */
2356 static bool
2357 MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
2358                                                         bool allow_merge, bool is_local,
2359                                                         bool is_no_inherit)
2360 {
2361         bool            found;
2362         Relation        conDesc;
2363         SysScanDesc conscan;
2364         ScanKeyData skey[2];
2365         HeapTuple       tup;
2366
2367         /* Search for a pg_constraint entry with same name and relation */
2368         conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
2369
2370         found = false;
2371
2372         ScanKeyInit(&skey[0],
2373                                 Anum_pg_constraint_conname,
2374                                 BTEqualStrategyNumber, F_NAMEEQ,
2375                                 CStringGetDatum(ccname));
2376
2377         ScanKeyInit(&skey[1],
2378                                 Anum_pg_constraint_connamespace,
2379                                 BTEqualStrategyNumber, F_OIDEQ,
2380                                 ObjectIdGetDatum(RelationGetNamespace(rel)));
2381
2382         conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
2383                                                                  NULL, 2, skey);
2384
2385         while (HeapTupleIsValid(tup = systable_getnext(conscan)))
2386         {
2387                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2388
2389                 if (con->conrelid == RelationGetRelid(rel))
2390                 {
2391                         /* Found it.  Conflicts if not identical check constraint */
2392                         if (con->contype == CONSTRAINT_CHECK)
2393                         {
2394                                 Datum           val;
2395                                 bool            isnull;
2396
2397                                 val = fastgetattr(tup,
2398                                                                   Anum_pg_constraint_conbin,
2399                                                                   conDesc->rd_att, &isnull);
2400                                 if (isnull)
2401                                         elog(ERROR, "null conbin for rel %s",
2402                                                  RelationGetRelationName(rel));
2403                                 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2404                                         found = true;
2405                         }
2406                         if (!found || !allow_merge)
2407                                 ereport(ERROR,
2408                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2409                                 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2410                                            ccname, RelationGetRelationName(rel))));
2411
2412                         tup = heap_copytuple(tup);
2413                         con = (Form_pg_constraint) GETSTRUCT(tup);
2414
2415                         /* If the constraint is "no inherit" then cannot merge */
2416                         if (con->connoinherit)
2417                                 ereport(ERROR,
2418                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2419                                                  errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2420                                                                 ccname, RelationGetRelationName(rel))));
2421
2422                         if (is_local)
2423                                 con->conislocal = true;
2424                         else
2425                                 con->coninhcount++;
2426                         if (is_no_inherit)
2427                         {
2428                                 Assert(is_local);
2429                                 con->connoinherit = true;
2430                         }
2431                         /* OK to update the tuple */
2432                         ereport(NOTICE,
2433                            (errmsg("merging constraint \"%s\" with inherited definition",
2434                                            ccname)));
2435                         simple_heap_update(conDesc, &tup->t_self, tup);
2436                         CatalogUpdateIndexes(conDesc, tup);
2437                         break;
2438                 }
2439         }
2440
2441         systable_endscan(conscan);
2442         heap_close(conDesc, RowExclusiveLock);
2443
2444         return found;
2445 }
2446
2447 /*
2448  * Update the count of constraints in the relation's pg_class tuple.
2449  *
2450  * Caller had better hold exclusive lock on the relation.
2451  *
2452  * An important side effect is that a SI update message will be sent out for
2453  * the pg_class tuple, which will force other backends to rebuild their
2454  * relcache entries for the rel.  Also, this backend will rebuild its
2455  * own relcache entry at the next CommandCounterIncrement.
2456  */
2457 static void
2458 SetRelationNumChecks(Relation rel, int numchecks)
2459 {
2460         Relation        relrel;
2461         HeapTuple       reltup;
2462         Form_pg_class relStruct;
2463
2464         relrel = heap_open(RelationRelationId, RowExclusiveLock);
2465         reltup = SearchSysCacheCopy1(RELOID,
2466                                                                  ObjectIdGetDatum(RelationGetRelid(rel)));
2467         if (!HeapTupleIsValid(reltup))
2468                 elog(ERROR, "cache lookup failed for relation %u",
2469                          RelationGetRelid(rel));
2470         relStruct = (Form_pg_class) GETSTRUCT(reltup);
2471
2472         if (relStruct->relchecks != numchecks)
2473         {
2474                 relStruct->relchecks = numchecks;
2475
2476                 simple_heap_update(relrel, &reltup->t_self, reltup);
2477
2478                 /* keep catalog indexes current */
2479                 CatalogUpdateIndexes(relrel, reltup);
2480         }
2481         else
2482         {
2483                 /* Skip the disk update, but force relcache inval anyway */
2484                 CacheInvalidateRelcache(rel);
2485         }
2486
2487         heap_freetuple(reltup);
2488         heap_close(relrel, RowExclusiveLock);
2489 }
2490
2491 /*
2492  * Take a raw default and convert it to a cooked format ready for
2493  * storage.
2494  *
2495  * Parse state should be set up to recognize any vars that might appear
2496  * in the expression.  (Even though we plan to reject vars, it's more
2497  * user-friendly to give the correct error message than "unknown var".)
2498  *
2499  * If atttypid is not InvalidOid, coerce the expression to the specified
2500  * type (and typmod atttypmod).   attname is only needed in this case:
2501  * it is used in the error message, if any.
2502  */
2503 Node *
2504 cookDefault(ParseState *pstate,
2505                         Node *raw_default,
2506                         Oid atttypid,
2507                         int32 atttypmod,
2508                         char *attname)
2509 {
2510         Node       *expr;
2511
2512         Assert(raw_default != NULL);
2513
2514         /*
2515          * Transform raw parsetree to executable expression.
2516          */
2517         expr = transformExpr(pstate, raw_default, EXPR_KIND_COLUMN_DEFAULT);
2518
2519         /*
2520          * Make sure default expr does not refer to any vars (we need this check
2521          * since the pstate includes the target table).
2522          */
2523         if (contain_var_clause(expr))
2524                 ereport(ERROR,
2525                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2526                           errmsg("cannot use column references in default expression")));
2527
2528         /*
2529          * transformExpr() should have already rejected subqueries, aggregates,
2530          * and window functions, based on the EXPR_KIND_ for a default expression.
2531          *
2532          * It can't return a set either.
2533          */
2534         if (expression_returns_set(expr))
2535                 ereport(ERROR,
2536                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2537                                  errmsg("default expression must not return a set")));
2538
2539         /*
2540          * Coerce the expression to the correct type and typmod, if given. This
2541          * should match the parser's processing of non-defaulted expressions ---
2542          * see transformAssignedExpr().
2543          */
2544         if (OidIsValid(atttypid))
2545         {
2546                 Oid                     type_id = exprType(expr);
2547
2548                 expr = coerce_to_target_type(pstate, expr, type_id,
2549                                                                          atttypid, atttypmod,
2550                                                                          COERCION_ASSIGNMENT,
2551                                                                          COERCE_IMPLICIT_CAST,
2552                                                                          -1);
2553                 if (expr == NULL)
2554                         ereport(ERROR,
2555                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
2556                                          errmsg("column \"%s\" is of type %s"
2557                                                         " but default expression is of type %s",
2558                                                         attname,
2559                                                         format_type_be(atttypid),
2560                                                         format_type_be(type_id)),
2561                            errhint("You will need to rewrite or cast the expression.")));
2562         }
2563
2564         /*
2565          * Finally, take care of collations in the finished expression.
2566          */
2567         assign_expr_collations(pstate, expr);
2568
2569         return expr;
2570 }
2571
2572 /*
2573  * Take a raw CHECK constraint expression and convert it to a cooked format
2574  * ready for storage.
2575  *
2576  * Parse state must be set up to recognize any vars that might appear
2577  * in the expression.
2578  */
2579 static Node *
2580 cookConstraint(ParseState *pstate,
2581                            Node *raw_constraint,
2582                            char *relname)
2583 {
2584         Node       *expr;
2585
2586         /*
2587          * Transform raw parsetree to executable expression.
2588          */
2589         expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
2590
2591         /*
2592          * Make sure it yields a boolean result.
2593          */
2594         expr = coerce_to_boolean(pstate, expr, "CHECK");
2595
2596         /*
2597          * Take care of collations.
2598          */
2599         assign_expr_collations(pstate, expr);
2600
2601         /*
2602          * Make sure no outside relations are referred to (this is probably dead
2603          * code now that add_missing_from is history).
2604          */
2605         if (list_length(pstate->p_rtable) != 1)
2606                 ereport(ERROR,
2607                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2608                         errmsg("only table \"%s\" can be referenced in check constraint",
2609                                    relname)));
2610
2611         return expr;
2612 }
2613
2614
2615 /*
2616  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
2617  *
2618  * If attnum is zero, remove all entries for rel; else remove only the one(s)
2619  * for that column.
2620  */
2621 void
2622 RemoveStatistics(Oid relid, AttrNumber attnum)
2623 {
2624         Relation        pgstatistic;
2625         SysScanDesc scan;
2626         ScanKeyData key[2];
2627         int                     nkeys;
2628         HeapTuple       tuple;
2629
2630         pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
2631
2632         ScanKeyInit(&key[0],
2633                                 Anum_pg_statistic_starelid,
2634                                 BTEqualStrategyNumber, F_OIDEQ,
2635                                 ObjectIdGetDatum(relid));
2636
2637         if (attnum == 0)
2638                 nkeys = 1;
2639         else
2640         {
2641                 ScanKeyInit(&key[1],
2642                                         Anum_pg_statistic_staattnum,
2643                                         BTEqualStrategyNumber, F_INT2EQ,
2644                                         Int16GetDatum(attnum));
2645                 nkeys = 2;
2646         }
2647
2648         scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
2649                                                           NULL, nkeys, key);
2650
2651         /* we must loop even when attnum != 0, in case of inherited stats */
2652         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
2653                 simple_heap_delete(pgstatistic, &tuple->t_self);
2654
2655         systable_endscan(scan);
2656
2657         heap_close(pgstatistic, RowExclusiveLock);
2658 }
2659
2660
2661 /*
2662  * RelationTruncateIndexes - truncate all indexes associated
2663  * with the heap relation to zero tuples.
2664  *
2665  * The routine will truncate and then reconstruct the indexes on
2666  * the specified relation.      Caller must hold exclusive lock on rel.
2667  */
2668 static void
2669 RelationTruncateIndexes(Relation heapRelation)
2670 {
2671         ListCell   *indlist;
2672
2673         /* Ask the relcache to produce a list of the indexes of the rel */
2674         foreach(indlist, RelationGetIndexList(heapRelation))
2675         {
2676                 Oid                     indexId = lfirst_oid(indlist);
2677                 Relation        currentIndex;
2678                 IndexInfo  *indexInfo;
2679
2680                 /* Open the index relation; use exclusive lock, just to be sure */
2681                 currentIndex = index_open(indexId, AccessExclusiveLock);
2682
2683                 /* Fetch info needed for index_build */
2684                 indexInfo = BuildIndexInfo(currentIndex);
2685
2686                 /*
2687                  * Now truncate the actual file (and discard buffers).
2688                  */
2689                 RelationTruncate(currentIndex, 0);
2690
2691                 /* Initialize the index and rebuild */
2692                 /* Note: we do not need to re-establish pkey setting */
2693                 index_build(heapRelation, currentIndex, indexInfo, false, true);
2694
2695                 /* We're done with this index */
2696                 index_close(currentIndex, NoLock);
2697         }
2698 }
2699
2700 /*
2701  *       heap_truncate
2702  *
2703  *       This routine deletes all data within all the specified relations.
2704  *
2705  * This is not transaction-safe!  There is another, transaction-safe
2706  * implementation in commands/tablecmds.c.      We now use this only for
2707  * ON COMMIT truncation of temporary tables, where it doesn't matter.
2708  */
2709 void
2710 heap_truncate(List *relids)
2711 {
2712         List       *relations = NIL;
2713         ListCell   *cell;
2714
2715         /* Open relations for processing, and grab exclusive access on each */
2716         foreach(cell, relids)
2717         {
2718                 Oid                     rid = lfirst_oid(cell);
2719                 Relation        rel;
2720
2721                 rel = heap_open(rid, AccessExclusiveLock);
2722                 relations = lappend(relations, rel);
2723         }
2724
2725         /* Don't allow truncate on tables that are referenced by foreign keys */
2726         heap_truncate_check_FKs(relations, true);
2727
2728         /* OK to do it */
2729         foreach(cell, relations)
2730         {
2731                 Relation        rel = lfirst(cell);
2732
2733                 /* Truncate the relation */
2734                 heap_truncate_one_rel(rel);
2735
2736                 /* Close the relation, but keep exclusive lock on it until commit */
2737                 heap_close(rel, NoLock);
2738         }
2739 }
2740
2741 /*
2742  *       heap_truncate_one_rel
2743  *
2744  *       This routine deletes all data within the specified relation.
2745  *
2746  * This is not transaction-safe, because the truncation is done immediately
2747  * and cannot be rolled back later.  Caller is responsible for having
2748  * checked permissions etc, and must have obtained AccessExclusiveLock.
2749  */
2750 void
2751 heap_truncate_one_rel(Relation rel)
2752 {
2753         Oid                     toastrelid;
2754
2755         /* Truncate the actual file (and discard buffers) */
2756         RelationTruncate(rel, 0);
2757
2758         /* If the relation has indexes, truncate the indexes too */
2759         RelationTruncateIndexes(rel);
2760
2761         /* If there is a toast table, truncate that too */
2762         toastrelid = rel->rd_rel->reltoastrelid;
2763         if (OidIsValid(toastrelid))
2764         {
2765                 Relation        toastrel = heap_open(toastrelid, AccessExclusiveLock);
2766
2767                 RelationTruncate(toastrel, 0);
2768                 RelationTruncateIndexes(toastrel);
2769                 /* keep the lock... */
2770                 heap_close(toastrel, NoLock);
2771         }
2772 }
2773
2774 /*
2775  * heap_truncate_check_FKs
2776  *              Check for foreign keys referencing a list of relations that
2777  *              are to be truncated, and raise error if there are any
2778  *
2779  * We disallow such FKs (except self-referential ones) since the whole point
2780  * of TRUNCATE is to not scan the individual rows to be thrown away.
2781  *
2782  * This is split out so it can be shared by both implementations of truncate.
2783  * Caller should already hold a suitable lock on the relations.
2784  *
2785  * tempTables is only used to select an appropriate error message.
2786  */
2787 void
2788 heap_truncate_check_FKs(List *relations, bool tempTables)
2789 {
2790         List       *oids = NIL;
2791         List       *dependents;
2792         ListCell   *cell;
2793
2794         /*
2795          * Build a list of OIDs of the interesting relations.
2796          *
2797          * If a relation has no triggers, then it can neither have FKs nor be
2798          * referenced by a FK from another table, so we can ignore it.
2799          */
2800         foreach(cell, relations)
2801         {
2802                 Relation        rel = lfirst(cell);
2803
2804                 if (rel->rd_rel->relhastriggers)
2805                         oids = lappend_oid(oids, RelationGetRelid(rel));
2806         }
2807
2808         /*
2809          * Fast path: if no relation has triggers, none has FKs either.
2810          */
2811         if (oids == NIL)
2812                 return;
2813
2814         /*
2815          * Otherwise, must scan pg_constraint.  We make one pass with all the
2816          * relations considered; if this finds nothing, then all is well.
2817          */
2818         dependents = heap_truncate_find_FKs(oids);
2819         if (dependents == NIL)
2820                 return;
2821
2822         /*
2823          * Otherwise we repeat the scan once per relation to identify a particular
2824          * pair of relations to complain about.  This is pretty slow, but
2825          * performance shouldn't matter much in a failure path.  The reason for
2826          * doing things this way is to ensure that the message produced is not
2827          * dependent on chance row locations within pg_constraint.
2828          */
2829         foreach(cell, oids)
2830         {
2831                 Oid                     relid = lfirst_oid(cell);
2832                 ListCell   *cell2;
2833
2834                 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
2835
2836                 foreach(cell2, dependents)
2837                 {
2838                         Oid                     relid2 = lfirst_oid(cell2);
2839
2840                         if (!list_member_oid(oids, relid2))
2841                         {
2842                                 char       *relname = get_rel_name(relid);
2843                                 char       *relname2 = get_rel_name(relid2);
2844
2845                                 if (tempTables)
2846                                         ereport(ERROR,
2847                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2848                                                          errmsg("unsupported ON COMMIT and foreign key combination"),
2849                                                          errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
2850                                                                            relname2, relname)));
2851                                 else
2852                                         ereport(ERROR,
2853                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2854                                                          errmsg("cannot truncate a table referenced in a foreign key constraint"),
2855                                                          errdetail("Table \"%s\" references \"%s\".",
2856                                                                            relname2, relname),
2857                                                    errhint("Truncate table \"%s\" at the same time, "
2858                                                                    "or use TRUNCATE ... CASCADE.",
2859                                                                    relname2)));
2860                         }
2861                 }
2862         }
2863 }
2864
2865 /*
2866  * heap_truncate_find_FKs
2867  *              Find relations having foreign keys referencing any of the given rels
2868  *
2869  * Input and result are both lists of relation OIDs.  The result contains
2870  * no duplicates, does *not* include any rels that were already in the input
2871  * list, and is sorted in OID order.  (The last property is enforced mainly
2872  * to guarantee consistent behavior in the regression tests; we don't want
2873  * behavior to change depending on chance locations of rows in pg_constraint.)
2874  *
2875  * Note: caller should already have appropriate lock on all rels mentioned
2876  * in relationIds.      Since adding or dropping an FK requires exclusive lock
2877  * on both rels, this ensures that the answer will be stable.
2878  */
2879 List *
2880 heap_truncate_find_FKs(List *relationIds)
2881 {
2882         List       *result = NIL;
2883         Relation        fkeyRel;
2884         SysScanDesc fkeyScan;
2885         HeapTuple       tuple;
2886
2887         /*
2888          * Must scan pg_constraint.  Right now, it is a seqscan because there is
2889          * no available index on confrelid.
2890          */
2891         fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
2892
2893         fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
2894                                                                   NULL, 0, NULL);
2895
2896         while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
2897         {
2898                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
2899
2900                 /* Not a foreign key */
2901                 if (con->contype != CONSTRAINT_FOREIGN)
2902                         continue;
2903
2904                 /* Not referencing one of our list of tables */
2905                 if (!list_member_oid(relationIds, con->confrelid))
2906                         continue;
2907
2908                 /* Add referencer unless already in input or result list */
2909                 if (!list_member_oid(relationIds, con->conrelid))
2910                         result = insert_ordered_unique_oid(result, con->conrelid);
2911         }
2912
2913         systable_endscan(fkeyScan);
2914         heap_close(fkeyRel, AccessShareLock);
2915
2916         return result;
2917 }
2918
2919 /*
2920  * insert_ordered_unique_oid
2921  *              Insert a new Oid into a sorted list of Oids, preserving ordering,
2922  *              and eliminating duplicates
2923  *
2924  * Building the ordered list this way is O(N^2), but with a pretty small
2925  * constant, so for the number of entries we expect it will probably be
2926  * faster than trying to apply qsort().  It seems unlikely someone would be
2927  * trying to truncate a table with thousands of dependent tables ...
2928  */
2929 static List *
2930 insert_ordered_unique_oid(List *list, Oid datum)
2931 {
2932         ListCell   *prev;
2933
2934         /* Does the datum belong at the front? */
2935         if (list == NIL || datum < linitial_oid(list))
2936                 return lcons_oid(datum, list);
2937         /* Does it match the first entry? */
2938         if (datum == linitial_oid(list))
2939                 return list;                    /* duplicate, so don't insert */
2940         /* No, so find the entry it belongs after */
2941         prev = list_head(list);
2942         for (;;)
2943         {
2944                 ListCell   *curr = lnext(prev);
2945
2946                 if (curr == NULL || datum < lfirst_oid(curr))
2947                         break;                          /* it belongs after 'prev', before 'curr' */
2948
2949                 if (datum == lfirst_oid(curr))
2950                         return list;            /* duplicate, so don't insert */
2951
2952                 prev = curr;
2953         }
2954         /* Insert datum into list after 'prev' */
2955         lappend_cell_oid(list, prev, datum);
2956         return list;
2957 }