]> granicus.if.org Git - postgresql/blob - src/backend/utils/resowner/resowner.c
Update copyright to 2004.
[postgresql] / src / backend / utils / resowner / resowner.c
1 /*-------------------------------------------------------------------------
2  *
3  * resowner.c
4  *        POSTGRES resource owner management code.
5  *
6  * Query-lifespan resources are tracked by associating them with
7  * ResourceOwner objects.  This provides a simple mechanism for ensuring
8  * that such resources are freed at the right time.
9  * See utils/resowner/README for more info.
10  *
11  *
12  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.5 2004/08/29 04:13:00 momjian Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21 #include "postgres.h"
22
23 #include "utils/resowner.h"
24 #include "access/gistscan.h"
25 #include "access/hash.h"
26 #include "access/rtree.h"
27 #include "storage/bufmgr.h"
28 #include "storage/proc.h"
29 #include "utils/memutils.h"
30 #include "utils/relcache.h"
31
32
33 /*
34  * ResourceOwner objects look like this
35  */
36 typedef struct ResourceOwnerData
37 {
38         ResourceOwner parent;           /* NULL if no parent (toplevel owner) */
39         ResourceOwner firstchild;       /* head of linked list of children */
40         ResourceOwner nextchild;        /* next child of same parent */
41         const char   *name;                     /* name (just for debugging) */
42
43         /* We have built-in support for remembering owned buffers */
44         int                     nbuffers;               /* number of owned buffer pins */
45         Buffer     *buffers;            /* dynamically allocated array */
46         int                     maxbuffers;             /* currently allocated array size */
47
48         /* We have built-in support for remembering catcache references */
49         int                     ncatrefs;               /* number of owned catcache pins */
50         HeapTuple  *catrefs;            /* dynamically allocated array */
51         int                     maxcatrefs;             /* currently allocated array size */
52
53         int                     ncatlistrefs;   /* number of owned catcache-list pins */
54         CatCList  **catlistrefs;        /* dynamically allocated array */
55         int                     maxcatlistrefs; /* currently allocated array size */
56
57         /* We have built-in support for remembering relcache references */
58         int                     nrelrefs;               /* number of owned relcache pins */
59         Relation   *relrefs;            /* dynamically allocated array */
60         int                     maxrelrefs;             /* currently allocated array size */
61 } ResourceOwnerData;
62
63
64 /*****************************************************************************
65  *        GLOBAL MEMORY                                                                                                                  *
66  *****************************************************************************/
67
68 ResourceOwner CurrentResourceOwner = NULL;
69 ResourceOwner CurTransactionResourceOwner = NULL;
70 ResourceOwner TopTransactionResourceOwner = NULL;
71
72 /*
73  * List of add-on callbacks for resource releasing
74  */
75 typedef struct ResourceReleaseCallbackItem
76 {
77         struct ResourceReleaseCallbackItem *next;
78         ResourceReleaseCallback callback;
79         void       *arg;
80 } ResourceReleaseCallbackItem;
81
82 static ResourceReleaseCallbackItem *ResourceRelease_callbacks = NULL;
83
84
85 /* Internal routines */
86 static void ResourceOwnerReleaseInternal(ResourceOwner owner,
87                                                                                  ResourceReleasePhase phase,
88                                                                                  bool isCommit,
89                                                                                  bool isTopLevel);
90
91
92 /*****************************************************************************
93  *        EXPORTED ROUTINES                                                                                                              *
94  *****************************************************************************/
95
96
97 /*
98  * ResourceOwnerCreate
99  *              Create an empty ResourceOwner.
100  *
101  * All ResourceOwner objects are kept in TopMemoryContext, since they should
102  * only be freed explicitly.
103  */
104 ResourceOwner
105 ResourceOwnerCreate(ResourceOwner parent, const char *name)
106 {
107         ResourceOwner owner;
108
109         owner = (ResourceOwner) MemoryContextAllocZero(TopMemoryContext,
110                                                                                                    sizeof(ResourceOwnerData));
111         owner->name = name;
112
113         if (parent)
114         {
115                 owner->parent = parent;
116                 owner->nextchild = parent->firstchild;
117                 parent->firstchild = owner;
118         }
119
120         return owner;
121 }
122
123 /*
124  * ResourceOwnerRelease
125  *              Release all resources owned by a ResourceOwner and its descendants,
126  *              but don't delete the owner objects themselves.
127  *
128  * Note that this executes just one phase of release, and so typically
129  * must be called three times.  We do it this way because (a) we want to
130  * do all the recursion separately for each phase, thereby preserving
131  * the needed order of operations; and (b) xact.c may have other operations
132  * to do between the phases.
133  *
134  * phase: release phase to execute
135  * isCommit: true for successful completion of a query or transaction,
136  *                      false for unsuccessful
137  * isTopLevel: true if completing a main transaction, else false
138  *
139  * isCommit is passed because some modules may expect that their resources
140  * were all released already if the transaction or portal finished normally.
141  * If so it is reasonable to give a warning (NOT an error) should any
142  * unreleased resources be present.  When isCommit is false, such warnings
143  * are generally inappropriate.
144  *
145  * isTopLevel is passed when we are releasing TopTransactionResourceOwner
146  * at completion of a main transaction.  This generally means that *all*
147  * resources will be released, and so we can optimize things a bit.
148  */
149 void
150 ResourceOwnerRelease(ResourceOwner owner,
151                                          ResourceReleasePhase phase,
152                                          bool isCommit,
153                                          bool isTopLevel)
154 {
155         /* Rather than PG_TRY at every level of recursion, set it up once */
156         ResourceOwner save;
157
158         save = CurrentResourceOwner;
159         PG_TRY();
160         {
161                 ResourceOwnerReleaseInternal(owner, phase, isCommit, isTopLevel);
162         }
163         PG_CATCH();
164         {
165                 CurrentResourceOwner = save;
166                 PG_RE_THROW();
167         }
168         PG_END_TRY();
169         CurrentResourceOwner = save;
170 }
171
172 static void
173 ResourceOwnerReleaseInternal(ResourceOwner owner,
174                                                          ResourceReleasePhase phase,
175                                                          bool isCommit,
176                                                          bool isTopLevel)
177 {
178         ResourceOwner child;
179         ResourceOwner save;
180         ResourceReleaseCallbackItem *item;
181
182         /* Recurse to handle descendants */
183         for (child = owner->firstchild; child != NULL; child = child->nextchild)
184                 ResourceOwnerReleaseInternal(child, phase, isCommit, isTopLevel);
185
186         /*
187          * Make CurrentResourceOwner point to me, so that ReleaseBuffer etc
188          * don't get confused.  We needn't PG_TRY here because the outermost
189          * level will fix it on error abort.
190          */
191         save = CurrentResourceOwner;
192         CurrentResourceOwner = owner;
193
194         if (phase == RESOURCE_RELEASE_BEFORE_LOCKS)
195         {
196                 /* Release buffer pins */
197                 if (isTopLevel)
198                 {
199                         /*
200                          * For a top-level xact we are going to release all buffers,
201                          * so just do a single bufmgr call at the top of the recursion.
202                          */
203                         if (owner == TopTransactionResourceOwner)
204                                 AtEOXact_Buffers(isCommit);
205                         /* Mark object as owning no buffers, just for sanity */
206                         owner->nbuffers = 0;
207                 }
208                 else
209                 {
210                         /*
211                          * Release buffers retail.  Note that ReleaseBuffer will remove
212                          * the buffer entry from my list, so I just have to iterate till
213                          * there are none.
214                          *
215                          * XXX this is fairly inefficient due to multiple BufMgrLock grabs
216                          * if there are lots of buffers to be released, but we don't
217                          * expect many (indeed none in the success case) so it's probably
218                          * not worth optimizing.
219                          *
220                          * We are however careful to release back-to-front, so as to
221                          * avoid O(N^2) behavior in ResourceOwnerForgetBuffer().
222                          */
223                         while (owner->nbuffers > 0)
224                                 ReleaseBuffer(owner->buffers[owner->nbuffers - 1]);
225                 }
226                 /* Release relcache references */
227                 if (isTopLevel)
228                 {
229                         /*
230                          * For a top-level xact we are going to release all references,
231                          * so just do a single relcache call at the top of the recursion.
232                          */
233                         if (owner == TopTransactionResourceOwner)
234                                 AtEOXact_RelationCache(isCommit);
235                         /* Mark object as owning no relrefs, just for sanity */
236                         owner->nrelrefs = 0;
237                 }
238                 else
239                 {
240                         /*
241                          * Release relcache refs retail.  Note that RelationClose will
242                          * remove the relref entry from my list, so I just have to iterate
243                          * till there are none.
244                          */
245                         while (owner->nrelrefs > 0)
246                                 RelationClose(owner->relrefs[owner->nrelrefs - 1]);
247                 }
248         }
249         else if (phase == RESOURCE_RELEASE_LOCKS)
250         {
251                 if (isTopLevel)
252                 {
253                         /*
254                          * For a top-level xact we are going to release all locks (or at
255                          * least all non-session locks), so just do a single lmgr call
256                          * at the top of the recursion.
257                          */
258                         if (owner == TopTransactionResourceOwner)
259                                 ProcReleaseLocks(isCommit);
260                 }
261                 else
262                 {
263                         /*
264                          * Release locks retail.  Note that if we are committing a
265                          * subtransaction, we do NOT release its locks yet, but transfer
266                          * them to the parent.
267                          */
268                         Assert(owner->parent != NULL);
269                         if (isCommit)
270                                 LockReassignCurrentOwner();
271                         else
272                                 LockReleaseCurrentOwner();
273                 }
274         }
275         else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
276         {
277                 /* Release catcache references */
278                 if (isTopLevel)
279                 {
280                         /*
281                          * For a top-level xact we are going to release all references,
282                          * so just do a single catcache call at the top of the recursion.
283                          */
284                         if (owner == TopTransactionResourceOwner)
285                                 AtEOXact_CatCache(isCommit);
286                         /* Mark object as owning no catrefs, just for sanity */
287                         owner->ncatrefs = 0;
288                         owner->ncatlistrefs = 0;
289                 }
290                 else
291                 {
292                         /*
293                          * Release catcache refs retail.  Note that ReleaseCatCache will
294                          * remove the catref entry from my list, so I just have to iterate
295                          * till there are none.  Ditto for catcache lists.
296                          */
297                         while (owner->ncatrefs > 0)
298                                 ReleaseCatCache(owner->catrefs[owner->ncatrefs - 1]);
299                         while (owner->ncatlistrefs > 0)
300                                 ReleaseCatCacheList(owner->catlistrefs[owner->ncatlistrefs - 1]);
301                 }
302                 /* Clean up index scans too */
303                 ReleaseResources_gist();
304                 ReleaseResources_hash();
305                 ReleaseResources_rtree();
306         }
307
308         /* Let add-on modules get a chance too */
309         for (item = ResourceRelease_callbacks; item; item = item->next)
310                 (*item->callback) (phase, isCommit, isTopLevel, item->arg);
311
312         CurrentResourceOwner = save;
313 }
314
315 /*
316  * ResourceOwnerDelete
317  *              Delete an owner object and its descendants.
318  *
319  * The caller must have already released all resources in the object tree.
320  */
321 void
322 ResourceOwnerDelete(ResourceOwner owner)
323 {
324         /* We had better not be deleting CurrentResourceOwner ... */
325         Assert(owner != CurrentResourceOwner);
326
327         /* And it better not own any resources, either */
328         Assert(owner->nbuffers == 0);
329         Assert(owner->ncatrefs == 0);
330         Assert(owner->ncatlistrefs == 0);
331         Assert(owner->nrelrefs == 0);
332
333         /*
334          * Delete children.  The recursive call will delink the child
335          * from me, so just iterate as long as there is a child.
336          */
337         while (owner->firstchild != NULL)
338                 ResourceOwnerDelete(owner->firstchild);
339
340         /*
341          * We delink the owner from its parent before deleting it, so that
342          * if there's an error we won't have deleted/busted owners still
343          * attached to the owner tree.  Better a leak than a crash.
344          */
345         ResourceOwnerNewParent(owner, NULL);
346
347         /* And free the object. */
348         if (owner->buffers)
349                 pfree(owner->buffers);
350         if (owner->catrefs)
351                 pfree(owner->catrefs);
352         if (owner->catlistrefs)
353                 pfree(owner->catlistrefs);
354         if (owner->relrefs)
355                 pfree(owner->relrefs);
356
357         pfree(owner);
358 }
359
360 /*
361  * Fetch parent of a ResourceOwner (returns NULL if top-level owner)
362  */
363 ResourceOwner
364 ResourceOwnerGetParent(ResourceOwner owner)
365 {
366         return owner->parent;
367 }
368
369 /*
370  * Reassign a ResourceOwner to have a new parent
371  */
372 void
373 ResourceOwnerNewParent(ResourceOwner owner,
374                                            ResourceOwner newparent)
375 {
376         ResourceOwner oldparent = owner->parent;
377
378         if (oldparent)
379         {
380                 if (owner == oldparent->firstchild)
381                         oldparent->firstchild = owner->nextchild;
382                 else
383                 {
384                         ResourceOwner child;
385
386                         for (child = oldparent->firstchild; child; child = child->nextchild)
387                         {
388                                 if (owner == child->nextchild)
389                                 {
390                                         child->nextchild = owner->nextchild;
391                                         break;
392                                 }
393                         }
394                 }
395         }
396
397         if (newparent)
398         {
399                 Assert(owner != newparent);
400                 owner->parent = newparent;
401                 owner->nextchild = newparent->firstchild;
402                 newparent->firstchild = owner;
403         }
404         else
405         {
406                 owner->parent = NULL;
407                 owner->nextchild = NULL;
408         }
409 }
410
411 /*
412  * Register or deregister callback functions for resource cleanup
413  *
414  * These functions are intended for use by dynamically loaded modules.
415  * For built-in modules we generally just hardwire the appropriate calls.
416  *
417  * Note that the callback occurs post-commit or post-abort, so the callback
418  * functions can only do noncritical cleanup.
419  */
420 void
421 RegisterResourceReleaseCallback(ResourceReleaseCallback callback, void *arg)
422 {
423         ResourceReleaseCallbackItem *item;
424
425         item = (ResourceReleaseCallbackItem *)
426                 MemoryContextAlloc(TopMemoryContext,
427                                                    sizeof(ResourceReleaseCallbackItem));
428         item->callback = callback;
429         item->arg = arg;
430         item->next = ResourceRelease_callbacks;
431         ResourceRelease_callbacks = item;
432 }
433
434 void
435 UnregisterResourceReleaseCallback(ResourceReleaseCallback callback, void *arg)
436 {
437         ResourceReleaseCallbackItem *item;
438         ResourceReleaseCallbackItem *prev;
439
440         prev = NULL;
441         for (item = ResourceRelease_callbacks; item; prev = item, item = item->next)
442         {
443                 if (item->callback == callback && item->arg == arg)
444                 {
445                         if (prev)
446                                 prev->next = item->next;
447                         else
448                                 ResourceRelease_callbacks = item->next;
449                         pfree(item);
450                         break;
451                 }
452         }
453 }
454
455
456 /*
457  * Make sure there is room for at least one more entry in a ResourceOwner's
458  * buffer array.
459  *
460  * This is separate from actually inserting an entry because if we run out
461  * of memory, it's critical to do so *before* acquiring the resource.
462  *
463  * We allow the case owner == NULL because the bufmgr is sometimes invoked
464  * outside any transaction (for example, in the bgwriter).
465  */
466 void
467 ResourceOwnerEnlargeBuffers(ResourceOwner owner)
468 {
469         int                     newmax;
470
471         if (owner == NULL ||
472                 owner->nbuffers < owner->maxbuffers)
473                 return;                                 /* nothing to do */
474
475         if (owner->buffers == NULL)
476         {
477                 newmax = 16;
478                 owner->buffers = (Buffer *)
479                         MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer));
480                 owner->maxbuffers = newmax;
481         }
482         else
483         {
484                 newmax = owner->maxbuffers * 2;
485                 owner->buffers = (Buffer *)
486                         repalloc(owner->buffers, newmax * sizeof(Buffer));
487                 owner->maxbuffers = newmax;
488         }
489 }
490
491 /*
492  * Remember that a buffer pin is owned by a ResourceOwner
493  *
494  * Caller must have previously done ResourceOwnerEnlargeBuffers()
495  *
496  * We allow the case owner == NULL because the bufmgr is sometimes invoked
497  * outside any transaction (for example, in the bgwriter).
498  */
499 void
500 ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer)
501 {
502         if (owner != NULL)
503         {
504                 Assert(owner->nbuffers < owner->maxbuffers);
505                 owner->buffers[owner->nbuffers] = buffer;
506                 owner->nbuffers++;
507         }
508 }
509
510 /*
511  * Forget that a buffer pin is owned by a ResourceOwner
512  *
513  * We allow the case owner == NULL because the bufmgr is sometimes invoked
514  * outside any transaction (for example, in the bgwriter).
515  */
516 void
517 ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
518 {
519         if (owner != NULL)
520         {
521                 Buffer     *buffers = owner->buffers;
522                 int                     nb1 = owner->nbuffers - 1;
523                 int                     i;
524
525                 /*
526                  * Scan back-to-front because it's more likely we are releasing
527                  * a recently pinned buffer.  This isn't always the case of course,
528                  * but it's the way to bet.
529                  */
530                 for (i = nb1; i >= 0; i--)
531                 {
532                         if (buffers[i] == buffer)
533                         {
534                                 while (i < nb1)
535                                 {
536                                         buffers[i] = buffers[i + 1];
537                                         i++;
538                                 }
539                                 owner->nbuffers = nb1;
540                                 return;
541                         }
542                 }
543                 elog(ERROR, "buffer %d is not owned by resource owner %s",
544                          buffer, owner->name);
545         }
546 }
547
548 /*
549  * Make sure there is room for at least one more entry in a ResourceOwner's
550  * catcache reference array.
551  *
552  * This is separate from actually inserting an entry because if we run out
553  * of memory, it's critical to do so *before* acquiring the resource.
554  */
555 void
556 ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner)
557 {
558         int                     newmax;
559
560         if (owner->ncatrefs < owner->maxcatrefs)
561                 return;                                 /* nothing to do */
562
563         if (owner->catrefs == NULL)
564         {
565                 newmax = 16;
566                 owner->catrefs = (HeapTuple *)
567                         MemoryContextAlloc(TopMemoryContext, newmax * sizeof(HeapTuple));
568                 owner->maxcatrefs = newmax;
569         }
570         else
571         {
572                 newmax = owner->maxcatrefs * 2;
573                 owner->catrefs = (HeapTuple *)
574                         repalloc(owner->catrefs, newmax * sizeof(HeapTuple));
575                 owner->maxcatrefs = newmax;
576         }
577 }
578
579 /*
580  * Remember that a catcache reference is owned by a ResourceOwner
581  *
582  * Caller must have previously done ResourceOwnerEnlargeCatCacheRefs()
583  */
584 void
585 ResourceOwnerRememberCatCacheRef(ResourceOwner owner, HeapTuple tuple)
586 {
587         Assert(owner->ncatrefs < owner->maxcatrefs);
588         owner->catrefs[owner->ncatrefs] = tuple;
589         owner->ncatrefs++;
590 }
591
592 /*
593  * Forget that a catcache reference is owned by a ResourceOwner
594  */
595 void
596 ResourceOwnerForgetCatCacheRef(ResourceOwner owner, HeapTuple tuple)
597 {
598         HeapTuple  *catrefs = owner->catrefs;
599         int                     nc1 = owner->ncatrefs - 1;
600         int                     i;
601
602         for (i = nc1; i >= 0; i--)
603         {
604                 if (catrefs[i] == tuple)
605                 {
606                         while (i < nc1)
607                         {
608                                 catrefs[i] = catrefs[i + 1];
609                                 i++;
610                         }
611                         owner->ncatrefs = nc1;
612                         return;
613                 }
614         }
615         elog(ERROR, "catcache reference %p is not owned by resource owner %s",
616                  tuple, owner->name);
617 }
618
619 /*
620  * Make sure there is room for at least one more entry in a ResourceOwner's
621  * catcache-list reference array.
622  *
623  * This is separate from actually inserting an entry because if we run out
624  * of memory, it's critical to do so *before* acquiring the resource.
625  */
626 void
627 ResourceOwnerEnlargeCatCacheListRefs(ResourceOwner owner)
628 {
629         int                     newmax;
630
631         if (owner->ncatlistrefs < owner->maxcatlistrefs)
632                 return;                                 /* nothing to do */
633
634         if (owner->catlistrefs == NULL)
635         {
636                 newmax = 16;
637                 owner->catlistrefs = (CatCList **)
638                         MemoryContextAlloc(TopMemoryContext, newmax * sizeof(CatCList *));
639                 owner->maxcatlistrefs = newmax;
640         }
641         else
642         {
643                 newmax = owner->maxcatlistrefs * 2;
644                 owner->catlistrefs = (CatCList **)
645                         repalloc(owner->catlistrefs, newmax * sizeof(CatCList *));
646                 owner->maxcatlistrefs = newmax;
647         }
648 }
649
650 /*
651  * Remember that a catcache-list reference is owned by a ResourceOwner
652  *
653  * Caller must have previously done ResourceOwnerEnlargeCatCacheListRefs()
654  */
655 void
656 ResourceOwnerRememberCatCacheListRef(ResourceOwner owner, CatCList *list)
657 {
658         Assert(owner->ncatlistrefs < owner->maxcatlistrefs);
659         owner->catlistrefs[owner->ncatlistrefs] = list;
660         owner->ncatlistrefs++;
661 }
662
663 /*
664  * Forget that a catcache-list reference is owned by a ResourceOwner
665  */
666 void
667 ResourceOwnerForgetCatCacheListRef(ResourceOwner owner, CatCList *list)
668 {
669         CatCList  **catlistrefs = owner->catlistrefs;
670         int                     nc1 = owner->ncatlistrefs - 1;
671         int                     i;
672
673         for (i = nc1; i >= 0; i--)
674         {
675                 if (catlistrefs[i] == list)
676                 {
677                         while (i < nc1)
678                         {
679                                 catlistrefs[i] = catlistrefs[i + 1];
680                                 i++;
681                         }
682                         owner->ncatlistrefs = nc1;
683                         return;
684                 }
685         }
686         elog(ERROR, "catcache list reference %p is not owned by resource owner %s",
687                  list, owner->name);
688 }
689
690 /*
691  * Make sure there is room for at least one more entry in a ResourceOwner's
692  * relcache reference array.
693  *
694  * This is separate from actually inserting an entry because if we run out
695  * of memory, it's critical to do so *before* acquiring the resource.
696  */
697 void
698 ResourceOwnerEnlargeRelationRefs(ResourceOwner owner)
699 {
700         int                     newmax;
701
702         if (owner->nrelrefs < owner->maxrelrefs)
703                 return;                                 /* nothing to do */
704
705         if (owner->relrefs == NULL)
706         {
707                 newmax = 16;
708                 owner->relrefs = (Relation *)
709                         MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Relation));
710                 owner->maxrelrefs = newmax;
711         }
712         else
713         {
714                 newmax = owner->maxrelrefs * 2;
715                 owner->relrefs = (Relation *)
716                         repalloc(owner->relrefs, newmax * sizeof(Relation));
717                 owner->maxrelrefs = newmax;
718         }
719 }
720
721 /*
722  * Remember that a relcache reference is owned by a ResourceOwner
723  *
724  * Caller must have previously done ResourceOwnerEnlargeRelationRefs()
725  */
726 void
727 ResourceOwnerRememberRelationRef(ResourceOwner owner, Relation rel)
728 {
729         Assert(owner->nrelrefs < owner->maxrelrefs);
730         owner->relrefs[owner->nrelrefs] = rel;
731         owner->nrelrefs++;
732 }
733
734 /*
735  * Forget that a relcache reference is owned by a ResourceOwner
736  */
737 void
738 ResourceOwnerForgetRelationRef(ResourceOwner owner, Relation rel)
739 {
740         Relation   *relrefs = owner->relrefs;
741         int                     nr1 = owner->nrelrefs - 1;
742         int                     i;
743
744         for (i = nr1; i >= 0; i--)
745         {
746                 if (relrefs[i] == rel)
747                 {
748                         while (i < nr1)
749                         {
750                                 relrefs[i] = relrefs[i + 1];
751                                 i++;
752                         }
753                         owner->nrelrefs = nr1;
754                         return;
755                 }
756         }
757         elog(ERROR, "relcache reference %s is not owned by resource owner %s",
758                  RelationGetRelationName(rel), owner->name);
759 }