]> granicus.if.org Git - postgresql/blob - src/backend/catalog/pg_inherits.c
Support domains over composite types.
[postgresql] / src / backend / catalog / pg_inherits.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_inherits.c
4  *        routines to support manipulation of the pg_inherits relation
5  *
6  * Note: currently, this module only contains inquiry functions; the actual
7  * creation and deletion of pg_inherits entries is done in tablecmds.c.
8  * Perhaps someday that code should be moved here, but it'd have to be
9  * disentangled from other stuff such as pg_depend updates.
10  *
11  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        src/backend/catalog/pg_inherits.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "catalog/indexing.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_inherits_fn.h"
28 #include "parser/parse_type.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
34 #include "utils/tqual.h"
35
36 /*
37  * Entry of a hash table used in find_all_inheritors. See below.
38  */
39 typedef struct SeenRelsEntry
40 {
41         Oid                     rel_id;                 /* relation oid */
42         ListCell   *numparents_cell;    /* corresponding list cell */
43 } SeenRelsEntry;
44
45 /*
46  * find_inheritance_children
47  *
48  * Returns a list containing the OIDs of all relations which
49  * inherit *directly* from the relation with OID 'parentrelId'.
50  *
51  * The specified lock type is acquired on each child relation (but not on the
52  * given rel; caller should already have locked it).  If lockmode is NoLock
53  * then no locks are acquired, but caller must beware of race conditions
54  * against possible DROPs of child relations.
55  */
56 List *
57 find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
58 {
59         List       *list = NIL;
60         Relation        relation;
61         SysScanDesc scan;
62         ScanKeyData key[1];
63         HeapTuple       inheritsTuple;
64         Oid                     inhrelid;
65         Oid                *oidarr;
66         int                     maxoids,
67                                 numoids,
68                                 i;
69
70         /*
71          * Can skip the scan if pg_class shows the relation has never had a
72          * subclass.
73          */
74         if (!has_subclass(parentrelId))
75                 return NIL;
76
77         /*
78          * Scan pg_inherits and build a working array of subclass OIDs.
79          */
80         maxoids = 32;
81         oidarr = (Oid *) palloc(maxoids * sizeof(Oid));
82         numoids = 0;
83
84         relation = heap_open(InheritsRelationId, AccessShareLock);
85
86         ScanKeyInit(&key[0],
87                                 Anum_pg_inherits_inhparent,
88                                 BTEqualStrategyNumber, F_OIDEQ,
89                                 ObjectIdGetDatum(parentrelId));
90
91         scan = systable_beginscan(relation, InheritsParentIndexId, true,
92                                                           NULL, 1, key);
93
94         while ((inheritsTuple = systable_getnext(scan)) != NULL)
95         {
96                 inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
97                 if (numoids >= maxoids)
98                 {
99                         maxoids *= 2;
100                         oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid));
101                 }
102                 oidarr[numoids++] = inhrelid;
103         }
104
105         systable_endscan(scan);
106
107         heap_close(relation, AccessShareLock);
108
109         /*
110          * If we found more than one child, sort them by OID.  This ensures
111          * reasonably consistent behavior regardless of the vagaries of an
112          * indexscan.  This is important since we need to be sure all backends
113          * lock children in the same order to avoid needless deadlocks.
114          */
115         if (numoids > 1)
116                 qsort(oidarr, numoids, sizeof(Oid), oid_cmp);
117
118         /*
119          * Acquire locks and build the result list.
120          */
121         for (i = 0; i < numoids; i++)
122         {
123                 inhrelid = oidarr[i];
124
125                 if (lockmode != NoLock)
126                 {
127                         /* Get the lock to synchronize against concurrent drop */
128                         LockRelationOid(inhrelid, lockmode);
129
130                         /*
131                          * Now that we have the lock, double-check to see if the relation
132                          * really exists or not.  If not, assume it was dropped while we
133                          * waited to acquire lock, and ignore it.
134                          */
135                         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid)))
136                         {
137                                 /* Release useless lock */
138                                 UnlockRelationOid(inhrelid, lockmode);
139                                 /* And ignore this relation */
140                                 continue;
141                         }
142                 }
143
144                 list = lappend_oid(list, inhrelid);
145         }
146
147         pfree(oidarr);
148
149         return list;
150 }
151
152
153 /*
154  * find_all_inheritors -
155  *              Returns a list of relation OIDs including the given rel plus
156  *              all relations that inherit from it, directly or indirectly.
157  *              Optionally, it also returns the number of parents found for
158  *              each such relation within the inheritance tree rooted at the
159  *              given rel.
160  *
161  * The specified lock type is acquired on all child relations (but not on the
162  * given rel; caller should already have locked it).  If lockmode is NoLock
163  * then no locks are acquired, but caller must beware of race conditions
164  * against possible DROPs of child relations.
165  */
166 List *
167 find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
168 {
169         /* hash table for O(1) rel_oid -> rel_numparents cell lookup */
170         HTAB       *seen_rels;
171         HASHCTL         ctl;
172         List       *rels_list,
173                            *rel_numparents;
174         ListCell   *l;
175
176         memset(&ctl, 0, sizeof(ctl));
177         ctl.keysize = sizeof(Oid);
178         ctl.entrysize = sizeof(SeenRelsEntry);
179         ctl.hcxt = CurrentMemoryContext;
180
181         seen_rels = hash_create("find_all_inheritors temporary table",
182                                                         32, /* start small and extend */
183                                                         &ctl,
184                                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
185
186         /*
187          * We build a list starting with the given rel and adding all direct and
188          * indirect children.  We can use a single list as both the record of
189          * already-found rels and the agenda of rels yet to be scanned for more
190          * children.  This is a bit tricky but works because the foreach() macro
191          * doesn't fetch the next list element until the bottom of the loop.
192          */
193         rels_list = list_make1_oid(parentrelId);
194         rel_numparents = list_make1_int(0);
195
196         foreach(l, rels_list)
197         {
198                 Oid                     currentrel = lfirst_oid(l);
199                 List       *currentchildren;
200                 ListCell   *lc;
201
202                 /* Get the direct children of this rel */
203                 currentchildren = find_inheritance_children(currentrel, lockmode);
204
205                 /*
206                  * Add to the queue only those children not already seen. This avoids
207                  * making duplicate entries in case of multiple inheritance paths from
208                  * the same parent.  (It'll also keep us from getting into an infinite
209                  * loop, though theoretically there can't be any cycles in the
210                  * inheritance graph anyway.)
211                  */
212                 foreach(lc, currentchildren)
213                 {
214                         Oid                     child_oid = lfirst_oid(lc);
215                         bool            found;
216                         SeenRelsEntry *hash_entry;
217
218                         hash_entry = hash_search(seen_rels, &child_oid, HASH_ENTER, &found);
219                         if (found)
220                         {
221                                 /* if the rel is already there, bump number-of-parents counter */
222                                 lfirst_int(hash_entry->numparents_cell)++;
223                         }
224                         else
225                         {
226                                 /* if it's not there, add it. expect 1 parent, initially. */
227                                 rels_list = lappend_oid(rels_list, child_oid);
228                                 rel_numparents = lappend_int(rel_numparents, 1);
229                                 hash_entry->numparents_cell = rel_numparents->tail;
230                         }
231                 }
232         }
233
234         if (numparents)
235                 *numparents = rel_numparents;
236         else
237                 list_free(rel_numparents);
238
239         hash_destroy(seen_rels);
240
241         return rels_list;
242 }
243
244
245 /*
246  * has_subclass - does this relation have any children?
247  *
248  * In the current implementation, has_subclass returns whether a
249  * particular class *might* have a subclass. It will not return the
250  * correct result if a class had a subclass which was later dropped.
251  * This is because relhassubclass in pg_class is not updated immediately
252  * when a subclass is dropped, primarily because of concurrency concerns.
253  *
254  * Currently has_subclass is only used as an efficiency hack to skip
255  * unnecessary inheritance searches, so this is OK.  Note that ANALYZE
256  * on a childless table will clean up the obsolete relhassubclass flag.
257  *
258  * Although this doesn't actually touch pg_inherits, it seems reasonable
259  * to keep it here since it's normally used with the other routines here.
260  */
261 bool
262 has_subclass(Oid relationId)
263 {
264         HeapTuple       tuple;
265         bool            result;
266
267         tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationId));
268         if (!HeapTupleIsValid(tuple))
269                 elog(ERROR, "cache lookup failed for relation %u", relationId);
270
271         result = ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass;
272         ReleaseSysCache(tuple);
273         return result;
274 }
275
276 /*
277  * has_superclass - does this relation inherit from another?  The caller
278  * should hold a lock on the given relation so that it can't be concurrently
279  * added to or removed from an inheritance hierarchy.
280  */
281 bool
282 has_superclass(Oid relationId)
283 {
284         Relation        catalog;
285         SysScanDesc scan;
286         ScanKeyData skey;
287         bool            result;
288
289         catalog = heap_open(InheritsRelationId, AccessShareLock);
290         ScanKeyInit(&skey, Anum_pg_inherits_inhrelid, BTEqualStrategyNumber,
291                                 F_OIDEQ, ObjectIdGetDatum(relationId));
292         scan = systable_beginscan(catalog, InheritsRelidSeqnoIndexId, true,
293                                                           NULL, 1, &skey);
294         result = HeapTupleIsValid(systable_getnext(scan));
295         systable_endscan(scan);
296         heap_close(catalog, AccessShareLock);
297
298         return result;
299 }
300
301 /*
302  * Given two type OIDs, determine whether the first is a complex type
303  * (class type) that inherits from the second.
304  *
305  * This essentially asks whether the first type is guaranteed to be coercible
306  * to the second.  Therefore, we allow the first type to be a domain over a
307  * complex type that inherits from the second; that creates no difficulties.
308  * But the second type cannot be a domain.
309  */
310 bool
311 typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
312 {
313         bool            result = false;
314         Oid                     subclassRelid;
315         Oid                     superclassRelid;
316         Relation        inhrel;
317         List       *visited,
318                            *queue;
319         ListCell   *queue_item;
320
321         /* We need to work with the associated relation OIDs */
322         subclassRelid = typeOrDomainTypeRelid(subclassTypeId);
323         if (subclassRelid == InvalidOid)
324                 return false;                   /* not a complex type or domain over one */
325         superclassRelid = typeidTypeRelid(superclassTypeId);
326         if (superclassRelid == InvalidOid)
327                 return false;                   /* not a complex type */
328
329         /* No point in searching if the superclass has no subclasses */
330         if (!has_subclass(superclassRelid))
331                 return false;
332
333         /*
334          * Begin the search at the relation itself, so add its relid to the queue.
335          */
336         queue = list_make1_oid(subclassRelid);
337         visited = NIL;
338
339         inhrel = heap_open(InheritsRelationId, AccessShareLock);
340
341         /*
342          * Use queue to do a breadth-first traversal of the inheritance graph from
343          * the relid supplied up to the root.  Notice that we append to the queue
344          * inside the loop --- this is okay because the foreach() macro doesn't
345          * advance queue_item until the next loop iteration begins.
346          */
347         foreach(queue_item, queue)
348         {
349                 Oid                     this_relid = lfirst_oid(queue_item);
350                 ScanKeyData skey;
351                 SysScanDesc inhscan;
352                 HeapTuple       inhtup;
353
354                 /*
355                  * If we've seen this relid already, skip it.  This avoids extra work
356                  * in multiple-inheritance scenarios, and also protects us from an
357                  * infinite loop in case there is a cycle in pg_inherits (though
358                  * theoretically that shouldn't happen).
359                  */
360                 if (list_member_oid(visited, this_relid))
361                         continue;
362
363                 /*
364                  * Okay, this is a not-yet-seen relid. Add it to the list of
365                  * already-visited OIDs, then find all the types this relid inherits
366                  * from and add them to the queue.
367                  */
368                 visited = lappend_oid(visited, this_relid);
369
370                 ScanKeyInit(&skey,
371                                         Anum_pg_inherits_inhrelid,
372                                         BTEqualStrategyNumber, F_OIDEQ,
373                                         ObjectIdGetDatum(this_relid));
374
375                 inhscan = systable_beginscan(inhrel, InheritsRelidSeqnoIndexId, true,
376                                                                          NULL, 1, &skey);
377
378                 while ((inhtup = systable_getnext(inhscan)) != NULL)
379                 {
380                         Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup);
381                         Oid                     inhparent = inh->inhparent;
382
383                         /* If this is the target superclass, we're done */
384                         if (inhparent == superclassRelid)
385                         {
386                                 result = true;
387                                 break;
388                         }
389
390                         /* Else add to queue */
391                         queue = lappend_oid(queue, inhparent);
392                 }
393
394                 systable_endscan(inhscan);
395
396                 if (result)
397                         break;
398         }
399
400         /* clean up ... */
401         heap_close(inhrel, AccessShareLock);
402
403         list_free(visited);
404         list_free(queue);
405
406         return result;
407 }