]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlogutils.c
Clean up WAL/buffer interactions as per my recent proposal. Get rid of the
[postgresql] / src / backend / access / transam / xlogutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlogutils.c
4  *
5  * PostgreSQL transaction log manager utility routines
6  *
7  * This file contains support routines that are used by XLOG replay functions.
8  * None of this code is used during normal system operation.
9  *
10  *
11  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.43 2006/03/31 23:32:06 tgl Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/xlogutils.h"
21 #include "storage/bufmgr.h"
22 #include "storage/bufpage.h"
23 #include "storage/smgr.h"
24 #include "utils/hsearch.h"
25
26
27 /*
28  * XLogReadBuffer
29  *              Read a page during XLOG replay
30  *
31  * This is functionally comparable to ReadBuffer followed by
32  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
33  * and locked buffer.  (Getting the lock is not really necessary, since we
34  * expect that this is only used during single-process XLOG replay, but
35  * some subroutines such as MarkBufferDirty will complain if we don't.)
36  *
37  * If "init" is true then the caller intends to rewrite the page fully
38  * using the info in the XLOG record.  In this case we will extend the
39  * relation if needed to make the page exist, and we will not complain about
40  * the page being "new" (all zeroes).
41  *
42  * If "init" is false then the caller needs the page to be valid already.
43  * If the page doesn't exist or contains zeroes, we report failure.
44  *
45  * If the return value is InvalidBuffer (only possible when init = false),
46  * the caller should silently skip the update on this page.  This currently
47  * never happens, but we retain it as part of the API spec for possible future
48  * use.
49  */
50 Buffer
51 XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
52 {
53         BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
54         Buffer          buffer;
55
56         Assert(blkno != P_NEW);
57
58         if (blkno < lastblock)
59         {
60                 /* page exists in file */
61                 buffer = ReadBuffer(reln, blkno);
62         }
63         else
64         {
65                 /* hm, page doesn't exist in file */
66                 if (!init)
67                         elog(PANIC, "block %u of relation %u/%u/%u does not exist",
68                                  blkno, reln->rd_node.spcNode,
69                                  reln->rd_node.dbNode, reln->rd_node.relNode);
70                 /* OK to extend the file */
71                 /* we do this in recovery only - no rel-extension lock needed */
72                 Assert(InRecovery);
73                 buffer = InvalidBuffer;
74                 while (blkno >= lastblock)
75                 {
76                         if (buffer != InvalidBuffer)
77                                 ReleaseBuffer(buffer);
78                         buffer = ReadBuffer(reln, P_NEW);
79                         lastblock++;
80                 }
81                 Assert(BufferGetBlockNumber(buffer) == blkno);
82         }
83
84         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
85
86         if (!init)
87         {
88                 /* check that page has been initialized */
89                 Page    page = (Page) BufferGetPage(buffer);
90
91                 if (PageIsNew((PageHeader) page))
92                         elog(PANIC, "block %u of relation %u/%u/%u is uninitialized",
93                                  blkno, reln->rd_node.spcNode,
94                                  reln->rd_node.dbNode, reln->rd_node.relNode);
95         }
96
97         return buffer;
98 }
99
100
101 /*
102  * Lightweight "Relation" cache --- this substitutes for the normal relcache
103  * during XLOG replay.
104  */
105
106 typedef struct XLogRelDesc
107 {
108         RelationData reldata;
109         struct XLogRelDesc *lessRecently;
110         struct XLogRelDesc *moreRecently;
111 } XLogRelDesc;
112
113 typedef struct XLogRelCacheEntry
114 {
115         RelFileNode rnode;
116         XLogRelDesc *rdesc;
117 } XLogRelCacheEntry;
118
119 static HTAB *_xlrelcache;
120 static XLogRelDesc *_xlrelarr = NULL;
121 static Form_pg_class _xlpgcarr = NULL;
122 static int      _xlast = 0;
123 static int      _xlcnt = 0;
124
125 #define _XLOG_RELCACHESIZE      512
126
127 static void
128 _xl_init_rel_cache(void)
129 {
130         HASHCTL         ctl;
131
132         _xlcnt = _XLOG_RELCACHESIZE;
133         _xlast = 0;
134         _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
135         memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
136         _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
137         memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
138
139         _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
140         _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
141
142         memset(&ctl, 0, sizeof(ctl));
143         ctl.keysize = sizeof(RelFileNode);
144         ctl.entrysize = sizeof(XLogRelCacheEntry);
145         ctl.hash = tag_hash;
146
147         _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
148                                                           &ctl, HASH_ELEM | HASH_FUNCTION);
149 }
150
151 static void
152 _xl_remove_hash_entry(XLogRelDesc *rdesc)
153 {
154         Form_pg_class tpgc = rdesc->reldata.rd_rel;
155         XLogRelCacheEntry *hentry;
156
157         rdesc->lessRecently->moreRecently = rdesc->moreRecently;
158         rdesc->moreRecently->lessRecently = rdesc->lessRecently;
159
160         hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
161                                           (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
162         if (hentry == NULL)
163                 elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
164
165         RelationCloseSmgr(&(rdesc->reldata));
166
167         memset(rdesc, 0, sizeof(XLogRelDesc));
168         memset(tpgc, 0, sizeof(FormData_pg_class));
169         rdesc->reldata.rd_rel = tpgc;
170 }
171
172 static XLogRelDesc *
173 _xl_new_reldesc(void)
174 {
175         XLogRelDesc *res;
176
177         _xlast++;
178         if (_xlast < _xlcnt)
179         {
180                 _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
181                 return &(_xlrelarr[_xlast]);
182         }
183
184         /* reuse */
185         res = _xlrelarr[0].moreRecently;
186
187         _xl_remove_hash_entry(res);
188
189         _xlast--;
190         return res;
191 }
192
193
194 void
195 XLogInitRelationCache(void)
196 {
197         _xl_init_rel_cache();
198 }
199
200 void
201 XLogCloseRelationCache(void)
202 {
203         HASH_SEQ_STATUS status;
204         XLogRelCacheEntry *hentry;
205
206         if (!_xlrelarr)
207                 return;
208
209         hash_seq_init(&status, _xlrelcache);
210
211         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
212                 _xl_remove_hash_entry(hentry->rdesc);
213
214         hash_destroy(_xlrelcache);
215
216         free(_xlrelarr);
217         free(_xlpgcarr);
218
219         _xlrelarr = NULL;
220 }
221
222 /*
223  * Open a relation during XLOG replay
224  *
225  * Note: this once had an API that allowed NULL return on failure, but it
226  * no longer does; any failure results in elog().
227  */
228 Relation
229 XLogOpenRelation(RelFileNode rnode)
230 {
231         XLogRelDesc *res;
232         XLogRelCacheEntry *hentry;
233         bool            found;
234
235         hentry = (XLogRelCacheEntry *)
236                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
237
238         if (hentry)
239         {
240                 res = hentry->rdesc;
241
242                 res->lessRecently->moreRecently = res->moreRecently;
243                 res->moreRecently->lessRecently = res->lessRecently;
244         }
245         else
246         {
247                 res = _xl_new_reldesc();
248
249                 sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
250
251                 res->reldata.rd_node = rnode;
252
253                 /*
254                  * We set up the lockRelId in case anything tries to lock the dummy
255                  * relation.  Note that this is fairly bogus since relNode may be
256                  * different from the relation's OID.  It shouldn't really matter
257                  * though, since we are presumably running by ourselves and can't have
258                  * any lock conflicts ...
259                  */
260                 res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
261                 res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
262
263                 hentry = (XLogRelCacheEntry *)
264                         hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
265
266                 if (found)
267                         elog(PANIC, "xlog relation already present on insert into cache");
268
269                 hentry->rdesc = res;
270
271                 res->reldata.rd_targblock = InvalidBlockNumber;
272                 res->reldata.rd_smgr = NULL;
273                 RelationOpenSmgr(&(res->reldata));
274
275                 /*
276                  * Create the target file if it doesn't already exist.  This lets us
277                  * cope if the replay sequence contains writes to a relation that is
278                  * later deleted.  (The original coding of this routine would instead
279                  * return NULL, causing the writes to be suppressed. But that seems
280                  * like it risks losing valuable data if the filesystem loses an inode
281                  * during a crash.      Better to write the data until we are actually
282                  * told to delete the file.)
283                  */
284                 smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
285         }
286
287         res->moreRecently = &(_xlrelarr[0]);
288         res->lessRecently = _xlrelarr[0].lessRecently;
289         _xlrelarr[0].lessRecently = res;
290         res->lessRecently->moreRecently = res;
291
292         return &(res->reldata);
293 }
294
295 /*
296  * Drop a relation during XLOG replay
297  *
298  * This is called when the relation is about to be deleted; we need to ensure
299  * that there is no dangling smgr reference in the xlog relation cache.
300  *
301  * Currently, we don't bother to physically remove the relation from the
302  * cache, we just let it age out normally.
303  */
304 void
305 XLogDropRelation(RelFileNode rnode)
306 {
307         XLogRelDesc *rdesc;
308         XLogRelCacheEntry *hentry;
309
310         hentry = (XLogRelCacheEntry *)
311                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
312
313         if (!hentry)
314                 return;                                 /* not in cache so no work */
315
316         rdesc = hentry->rdesc;
317
318         RelationCloseSmgr(&(rdesc->reldata));
319 }
320
321 /*
322  * Drop a whole database during XLOG replay
323  *
324  * As above, but for DROP DATABASE instead of dropping a single rel
325  */
326 void
327 XLogDropDatabase(Oid dbid)
328 {
329         HASH_SEQ_STATUS status;
330         XLogRelCacheEntry *hentry;
331
332         hash_seq_init(&status, _xlrelcache);
333
334         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
335         {
336                 XLogRelDesc *rdesc = hentry->rdesc;
337
338                 if (hentry->rnode.dbNode == dbid)
339                         RelationCloseSmgr(&(rdesc->reldata));
340         }
341 }