]> granicus.if.org Git - postgresql/blob - src/backend/catalog/namespace.c
Reimplement temp tables using schemas. The temp table map is history;
[postgresql] / src / backend / catalog / namespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * namespace.c
4  *        code to support accessing and searching namespaces
5  *
6  * This is separate from pg_namespace.c, which contains the routines that
7  * directly manipulate the pg_namespace system catalog.  This module
8  * provides routines associated with defining a "namespace search path"
9  * and implementing search-path-controlled searches.
10  *
11  *
12  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.4 2002/03/31 06:26:30 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include "access/heapam.h"
23 #include "access/xact.h"
24 #include "catalog/catname.h"
25 #include "catalog/heap.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_shadow.h"
30 #include "miscadmin.h"
31 #include "nodes/makefuncs.h"
32 #include "storage/backendid.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/syscache.h"
36
37
38 /*
39  * The namespace search path is a possibly-empty list of namespace OIDs.
40  * In addition to the explicit list, the TEMP table namespace is always
41  * implicitly searched first (if it's been initialized).  Also, the system
42  * catalog namespace is always searched.  If the system namespace is
43  * explicitly present in the path then it will be searched in the specified
44  * order; otherwise it will be searched after TEMP tables and *before* the
45  * explicit list.  (It might seem that the system namespace should be
46  * implicitly last, but this behavior appears to be required by SQL99.
47  * Also, this provides a way to search the system namespace first without
48  * thereby making it the default creation target namespace.)
49  *
50  * The default creation target namespace is kept equal to the first element
51  * of the explicit list, or is the system namespace if the list is empty.
52  *
53  * In bootstrap mode or a standalone backend, the default search path is
54  * empty, so that the system namespace is the only one searched or inserted
55  * into.  In multiuser mode, the default search path contains the PG_PUBLIC
56  * namespace, preceded by the user's own namespace if one exists.
57  */
58
59 static List *namespaceSearchPath = NIL;
60
61 /* this flag must be updated correctly when namespaceSearchPath is changed */
62 static bool pathContainsSystemNamespace = false;
63
64 /* default place to create stuff */
65 static Oid      defaultCreationNamespace = PG_CATALOG_NAMESPACE;
66
67 /*
68  * myTempNamespace is InvalidOid until and unless a TEMP namespace is set up
69  * in a particular backend session (this happens when a CREATE TEMP TABLE
70  * command is first executed).  Thereafter it's the OID of the temp namespace.
71  */
72 static Oid      myTempNamespace = InvalidOid;
73
74
75 /*
76  * Deletion ordering constraint item.
77  */
78 typedef struct DelConstraint
79 {
80         Oid                     referencer;             /* table to delete first */
81         Oid                     referencee;             /* table to delete second */
82         int                     pred;                   /* workspace for TopoSortRels */
83         struct DelConstraint *link;     /* workspace for TopoSortRels */
84 } DelConstraint;
85
86
87 /* Local functions */
88 static Oid      GetTempTableNamespace(void);
89 static void RemoveTempRelations(Oid tempNamespaceId);
90 static List *FindTempRelations(Oid tempNamespaceId);
91 static List *FindDeletionConstraints(List *relOids);
92 static List *TopoSortRels(List *relOids, List *constraintList);
93 static void RemoveTempRelationsCallback(void);
94
95
96 /*
97  * RangeVarGetRelid
98  *              Given a RangeVar describing an existing relation,
99  *              select the proper namespace and look up the relation OID.
100  *
101  * If the relation is not found, return InvalidOid if failOK = true,
102  * otherwise raise an error.
103  */
104 Oid
105 RangeVarGetRelid(const RangeVar *relation, bool failOK)
106 {
107         Oid                     namespaceId;
108         Oid                     relId;
109
110         /*
111          * We check the catalog name and then ignore it.
112          */
113         if (relation->catalogname)
114         {
115                 if (strcmp(relation->catalogname, DatabaseName) != 0)
116                         elog(ERROR, "Cross-database references are not implemented");
117         }
118
119         if (relation->schemaname)
120         {
121                 /* use exact schema given */
122                 namespaceId = GetSysCacheOid(NAMESPACENAME,
123                                                                          CStringGetDatum(relation->schemaname),
124                                                                          0, 0, 0);
125                 if (!OidIsValid(namespaceId))
126                         elog(ERROR, "Namespace \"%s\" does not exist",
127                                  relation->schemaname);
128                 relId = get_relname_relid(relation->relname, namespaceId);
129         }
130         else
131         {
132                 /* search the namespace path */
133                 relId = RelnameGetRelid(relation->relname);
134         }
135
136         if (!OidIsValid(relId) && !failOK)
137         {
138                 if (relation->schemaname)
139                         elog(ERROR, "Relation \"%s\".\"%s\" does not exist",
140                                  relation->schemaname, relation->relname);
141                 else
142                         elog(ERROR, "Relation \"%s\" does not exist",
143                                  relation->relname);
144         }
145         return relId;
146 }
147
148 /*
149  * RangeVarGetCreationNamespace
150  *              Given a RangeVar describing a to-be-created relation,
151  *              choose which namespace to create it in.
152  *
153  * Note: calling this may result in a CommandCounterIncrement operation.
154  * That will happen on the first request for a temp table in any particular
155  * backend run; we will need to either create or clean out the temp schema.
156  */
157 Oid
158 RangeVarGetCreationNamespace(const RangeVar *newRelation)
159 {
160         Oid                     namespaceId;
161
162         /*
163          * We check the catalog name and then ignore it.
164          */
165         if (newRelation->catalogname)
166         {
167                 if (strcmp(newRelation->catalogname, DatabaseName) != 0)
168                         elog(ERROR, "Cross-database references are not implemented");
169         }
170
171         if (newRelation->istemp)
172         {
173                 /* TEMP tables are created in our backend-local temp namespace */
174                 if (newRelation->schemaname)
175                         elog(ERROR, "TEMP tables may not specify a namespace");
176                 /* Initialize temp namespace if first time through */
177                 if (!OidIsValid(myTempNamespace))
178                         myTempNamespace = GetTempTableNamespace();
179                 return myTempNamespace;
180         }
181
182         if (newRelation->schemaname)
183         {
184                 /* use exact schema given */
185                 namespaceId = GetSysCacheOid(NAMESPACENAME,
186                                                                          CStringGetDatum(newRelation->schemaname),
187                                                                          0, 0, 0);
188                 if (!OidIsValid(namespaceId))
189                         elog(ERROR, "Namespace \"%s\" does not exist",
190                                  newRelation->schemaname);
191         }
192         else
193         {
194                 /* use the default creation namespace */
195                 namespaceId = defaultCreationNamespace;
196         }
197
198         return namespaceId;
199 }
200
201 /*
202  * RelnameGetRelid
203  *              Try to resolve an unqualified relation name.
204  *              Returns OID if relation found in search path, else InvalidOid.
205  */
206 Oid
207 RelnameGetRelid(const char *relname)
208 {
209         Oid                     relid;
210         List       *lptr;
211
212         /*
213          * If a TEMP-table namespace has been set up, it is implicitly first
214          * in the search path.
215          */
216         if (OidIsValid(myTempNamespace))
217         {
218                 relid = get_relname_relid(relname, myTempNamespace);
219                 if (OidIsValid(relid))
220                         return relid;
221         }
222
223         /*
224          * If system namespace is not in path, implicitly search it before path
225          */
226         if (!pathContainsSystemNamespace)
227         {
228                 relid = get_relname_relid(relname, PG_CATALOG_NAMESPACE);
229                 if (OidIsValid(relid))
230                         return relid;
231         }
232
233         /*
234          * Else search the path
235          */
236         foreach(lptr, namespaceSearchPath)
237         {
238                 Oid                     namespaceId = (Oid) lfirsti(lptr);
239
240                 relid = get_relname_relid(relname, namespaceId);
241                 if (OidIsValid(relid))
242                         return relid;
243         }
244
245         /* Not found in path */
246         return InvalidOid;
247 }
248
249 /*
250  * TypenameGetTypid
251  *              Try to resolve an unqualified datatype name.
252  *              Returns OID if type found in search path, else InvalidOid.
253  *
254  * This is essentially the same as RelnameGetRelid, but we never search
255  * the TEMP table namespace --- there is no reason to refer to the types
256  * of temp tables, AFAICS.
257  */
258 Oid
259 TypenameGetTypid(const char *typname)
260 {
261         Oid                     typid;
262         List       *lptr;
263
264         /*
265          * If system namespace is not in path, implicitly search it before path
266          */
267         if (!pathContainsSystemNamespace)
268         {
269                 typid = GetSysCacheOid(TYPENAMENSP,
270                                                            PointerGetDatum(typname),
271                                                            ObjectIdGetDatum(PG_CATALOG_NAMESPACE),
272                                                            0, 0);
273                 if (OidIsValid(typid))
274                         return typid;
275         }
276
277         /*
278          * Else search the path
279          */
280         foreach(lptr, namespaceSearchPath)
281         {
282                 Oid                     namespaceId = (Oid) lfirsti(lptr);
283
284                 typid = GetSysCacheOid(TYPENAMENSP,
285                                                            PointerGetDatum(typname),
286                                                            ObjectIdGetDatum(namespaceId),
287                                                            0, 0);
288                 if (OidIsValid(typid))
289                         return typid;
290         }
291
292         /* Not found in path */
293         return InvalidOid;
294 }
295
296 /*
297  * QualifiedNameGetCreationNamespace
298  *              Given a possibly-qualified name for an object (in List-of-Values
299  *              format), determine what namespace the object should be created in.
300  *              Also extract and return the object name (last component of list).
301  *
302  * This is *not* used for tables.  Hence, the TEMP table namespace is
303  * never selected as the creation target.
304  */
305 Oid
306 QualifiedNameGetCreationNamespace(List *names, char **objname_p)
307 {
308         char       *catalogname;
309         char       *schemaname = NULL;
310         char       *objname = NULL;
311         Oid                     namespaceId;
312
313         /* deconstruct the name list */
314         switch (length(names))
315         {
316                 case 1:
317                         objname = strVal(lfirst(names));
318                         break;
319                 case 2:
320                         schemaname = strVal(lfirst(names));
321                         objname = strVal(lsecond(names));
322                         break;
323                 case 3:
324                         catalogname = strVal(lfirst(names));
325                         schemaname = strVal(lsecond(names));
326                         objname = strVal(lfirst(lnext(lnext(names))));
327                         /*
328                          * We check the catalog name and then ignore it.
329                          */
330                         if (strcmp(catalogname, DatabaseName) != 0)
331                                 elog(ERROR, "Cross-database references are not implemented");
332                         break;
333                 default:
334                         elog(ERROR, "Improper qualified name (too many dotted names)");
335                         break;
336         }
337
338         if (schemaname)
339         {
340                 /* use exact schema given */
341                 namespaceId = GetSysCacheOid(NAMESPACENAME,
342                                                                          CStringGetDatum(schemaname),
343                                                                          0, 0, 0);
344                 if (!OidIsValid(namespaceId))
345                         elog(ERROR, "Namespace \"%s\" does not exist",
346                                  schemaname);
347         }
348         else
349         {
350                 /* use the default creation namespace */
351                 namespaceId = defaultCreationNamespace;
352         }
353
354         *objname_p = objname;
355         return namespaceId;
356 }
357
358 /*
359  * makeRangeVarFromNameList
360  *              Utility routine to convert a qualified-name list into RangeVar form.
361  */
362 RangeVar *
363 makeRangeVarFromNameList(List *names)
364 {
365         RangeVar   *rel = makeRangeVar(NULL, NULL);
366
367         switch (length(names))
368         {
369                 case 1:
370                         rel->relname = strVal(lfirst(names));
371                         break;
372                 case 2:
373                         rel->schemaname = strVal(lfirst(names));
374                         rel->relname = strVal(lsecond(names));
375                         break;
376                 case 3:
377                         rel->catalogname = strVal(lfirst(names));
378                         rel->schemaname = strVal(lsecond(names));
379                         rel->relname = strVal(lfirst(lnext(lnext(names))));
380                         break;
381                 default:
382                         elog(ERROR, "Improper relation name (too many dotted names)");
383                         break;
384         }
385
386         return rel;
387 }
388
389 /*
390  * isTempNamespace - is the given namespace my temporary-table namespace?
391  */
392 bool
393 isTempNamespace(Oid namespaceId)
394 {
395         if (OidIsValid(myTempNamespace) && myTempNamespace == namespaceId)
396                 return true;
397         return false;
398 }
399
400 /*
401  * GetTempTableNamespace
402  *              Initialize temp table namespace on first use in a particular backend
403  */
404 static Oid
405 GetTempTableNamespace(void)
406 {
407         char            namespaceName[NAMEDATALEN];
408         Oid                     namespaceId;
409
410         snprintf(namespaceName, NAMEDATALEN, "pg_temp_%d", MyBackendId);
411
412         namespaceId = GetSysCacheOid(NAMESPACENAME,
413                                                                  CStringGetDatum(namespaceName),
414                                                                  0, 0, 0);
415         if (!OidIsValid(namespaceId))
416         {
417                 /*
418                  * First use of this temp namespace in this database; create it.
419                  * The temp namespaces are always owned by the superuser.
420                  */
421                 namespaceId = NamespaceCreate(namespaceName, BOOTSTRAP_USESYSID);
422                 /* Advance command counter to make namespace visible */
423                 CommandCounterIncrement();
424         }
425         else
426         {
427                 /*
428                  * If the namespace already exists, clean it out (in case the
429                  * former owner crashed without doing so).
430                  */
431                 RemoveTempRelations(namespaceId);
432         }
433
434         /*
435          * Register exit callback to clean out temp tables at backend shutdown.
436          */
437         on_shmem_exit(RemoveTempRelationsCallback, 0);
438
439         return namespaceId;
440 }
441
442 /*
443  * Remove all relations in the specified temp namespace.
444  *
445  * This is called at backend shutdown (if we made any temp relations).
446  * It is also called when we begin using a pre-existing temp namespace,
447  * in order to clean out any relations that might have been created by
448  * a crashed backend.
449  */
450 static void
451 RemoveTempRelations(Oid tempNamespaceId)
452 {
453         List       *tempRelList;
454         List       *constraintList;
455         List       *lptr;
456
457         /* Get a list of relations to delete */
458         tempRelList = FindTempRelations(tempNamespaceId);
459
460         if (tempRelList == NIL)
461                 return;                                 /* nothing to do */
462
463         /* If more than one, sort them to respect any deletion-order constraints */
464         if (length(tempRelList) > 1)
465         {
466                 constraintList = FindDeletionConstraints(tempRelList);
467                 if (constraintList != NIL)
468                         tempRelList = TopoSortRels(tempRelList, constraintList);
469         }
470
471         /* Scan the list and delete all entries */
472         foreach(lptr, tempRelList)
473         {
474                 Oid                     reloid = (Oid) lfirsti(lptr);
475
476                 heap_drop_with_catalog(reloid, true);
477                 /*
478                  * Advance cmd counter to make catalog changes visible, in case
479                  * a later entry depends on this one.
480                  */
481                 CommandCounterIncrement();
482         }
483 }
484
485 /*
486  * Find all relations in the specified temp namespace.
487  *
488  * Returns a list of relation OIDs.
489  */
490 static List *
491 FindTempRelations(Oid tempNamespaceId)
492 {
493         List       *tempRelList = NIL;
494         Relation        pgclass;
495         HeapScanDesc scan;
496         HeapTuple       tuple;
497         ScanKeyData key;
498
499         /*
500          * Scan pg_class to find all the relations in the target namespace.
501          * Ignore indexes, though, on the assumption that they'll go away
502          * when their tables are deleted.
503          */
504         ScanKeyEntryInitialize(&key, 0x0,
505                                                    Anum_pg_class_relnamespace,
506                                                    F_OIDEQ,
507                                                    ObjectIdGetDatum(tempNamespaceId));
508
509         pgclass = heap_openr(RelationRelationName, AccessShareLock);
510         scan = heap_beginscan(pgclass, false, SnapshotNow, 1, &key);
511
512         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
513         {
514                 switch (((Form_pg_class) GETSTRUCT(tuple))->relkind)
515                 {
516                         case RELKIND_RELATION:
517                         case RELKIND_SEQUENCE:
518                         case RELKIND_VIEW:
519                                 tempRelList = lconsi(tuple->t_data->t_oid, tempRelList);
520                                 break;
521                         default:
522                                 break;
523                 }
524         }
525
526         heap_endscan(scan);
527         heap_close(pgclass, AccessShareLock);
528
529         return tempRelList;
530 }
531
532 /*
533  * Find deletion-order constraints involving the given relation OIDs.
534  *
535  * Returns a list of DelConstraint objects.
536  */
537 static List *
538 FindDeletionConstraints(List *relOids)
539 {
540         List       *constraintList = NIL;
541         Relation        inheritsrel;
542         HeapScanDesc scan;
543         HeapTuple       tuple;
544
545         /*
546          * Scan pg_inherits to find parents and children that are in the list.
547          */
548         inheritsrel = heap_openr(InheritsRelationName, AccessShareLock);
549         scan = heap_beginscan(inheritsrel, 0, SnapshotNow, 0, NULL);
550
551         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
552         {
553                 Oid             inhrelid = ((Form_pg_inherits) GETSTRUCT(tuple))->inhrelid;
554                 Oid             inhparent = ((Form_pg_inherits) GETSTRUCT(tuple))->inhparent;
555
556                 if (intMember(inhrelid, relOids) && intMember(inhparent, relOids))
557                 {
558                         DelConstraint  *item;
559
560                         item = (DelConstraint *) palloc(sizeof(DelConstraint));
561                         item->referencer = inhrelid;
562                         item->referencee = inhparent;
563                         constraintList = lcons(item, constraintList);
564                 }
565         }
566
567         heap_endscan(scan);
568         heap_close(inheritsrel, AccessShareLock);
569
570         return constraintList;
571 }
572
573 /*
574  * TopoSortRels -- topological sort of a list of rels to delete
575  *
576  * This is a lot simpler and slower than, for example, the topological sort
577  * algorithm shown in Knuth's Volume 1.  However, we are not likely to be
578  * working with more than a few constraints, so the apparent slowness of the
579  * algorithm won't really matter.
580  */
581 static List *
582 TopoSortRels(List *relOids, List *constraintList)
583 {
584         int                     queue_size = length(relOids);
585         Oid                *rels;
586         int                *beforeConstraints;
587         DelConstraint **afterConstraints;
588         List       *resultList = NIL;
589         List       *lptr;
590         int                     i,
591                                 j,
592                                 k,
593                                 last;
594
595         /* Allocate workspace */
596         rels = (Oid *) palloc(queue_size * sizeof(Oid));
597         beforeConstraints = (int *) palloc(queue_size * sizeof(int));
598         afterConstraints = (DelConstraint **)
599                 palloc(queue_size * sizeof(DelConstraint*));
600
601         /* Build an array of the target relation OIDs */
602         i = 0;
603         foreach(lptr, relOids)
604         {
605                 rels[i++] = (Oid) lfirsti(lptr);
606         }
607
608         /*
609          * Scan the constraints, and for each rel in the array, generate a
610          * count of the number of constraints that say it must be before
611          * something else, plus a list of the constraints that say it must be
612          * after something else. The count for the j'th rel is stored in
613          * beforeConstraints[j], and the head of its list in
614          * afterConstraints[j].  Each constraint stores its list link in
615          * its link field (note any constraint will be in just one list).
616          * The array index for the before-rel of each constraint is
617          * remembered in the constraint's pred field.
618          */
619         MemSet(beforeConstraints, 0, queue_size * sizeof(int));
620         MemSet(afterConstraints, 0, queue_size * sizeof(DelConstraint*));
621         foreach(lptr, constraintList)
622         {
623                 DelConstraint  *constraint = (DelConstraint *) lfirst(lptr);
624                 Oid                     rel;
625
626                 /* Find the referencer rel in the array */
627                 rel = constraint->referencer;
628                 for (j = queue_size; --j >= 0;)
629                 {
630                         if (rels[j] == rel)
631                                 break;
632                 }
633                 Assert(j >= 0);                 /* should have found a match */
634                 /* Find the referencee rel in the array */
635                 rel = constraint->referencee;
636                 for (k = queue_size; --k >= 0;)
637                 {
638                         if (rels[k] == rel)
639                                 break;
640                 }
641                 Assert(k >= 0);                 /* should have found a match */
642                 beforeConstraints[j]++; /* referencer must come before */
643                 /* add this constraint to list of after-constraints for referencee */
644                 constraint->pred = j;
645                 constraint->link = afterConstraints[k];
646                 afterConstraints[k] = constraint;
647         }
648         /*--------------------
649          * Now scan the rels array backwards.   At each step, output the
650          * last rel that has no remaining before-constraints, and decrease
651          * the beforeConstraints count of each of the rels it was constrained
652          * against.  (This is the right order since we are building the result
653          * list back-to-front.)
654          * i = counter for number of rels left to output
655          * j = search index for rels[]
656          * dc = temp for scanning constraint list for rel j
657          * last = last valid index in rels (avoid redundant searches)
658          *--------------------
659          */
660         last = queue_size - 1;
661         for (i = queue_size; --i >= 0;)
662         {
663                 DelConstraint  *dc;
664
665                 /* Find next candidate to output */
666                 while (rels[last] == InvalidOid)
667                         last--;
668                 for (j = last; j >= 0; j--)
669                 {
670                         if (rels[j] != InvalidOid && beforeConstraints[j] == 0)
671                                 break;
672                 }
673                 /* If no available candidate, topological sort fails */
674                 if (j < 0)
675                         elog(ERROR, "TopoSortRels: failed to find a workable deletion ordering");
676                 /* Output candidate, and mark it done by zeroing rels[] entry */
677                 resultList = lconsi(rels[j], resultList);
678                 rels[j] = InvalidOid;
679                 /* Update beforeConstraints counts of its predecessors */
680                 for (dc = afterConstraints[j]; dc; dc = dc->link)
681                         beforeConstraints[dc->pred]--;
682         }
683
684         /* Done */
685         return resultList;
686 }
687
688 /*
689  * Callback to remove temp relations at backend exit.
690  */
691 static void
692 RemoveTempRelationsCallback(void)
693 {
694         if (OidIsValid(myTempNamespace)) /* should always be true */
695         {
696                 /* Need to ensure we have a usable transaction. */
697                 AbortOutOfAnyTransaction();
698                 StartTransactionCommand();
699
700                 RemoveTempRelations(myTempNamespace);
701
702                 CommitTransactionCommand();
703         }
704 }