]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlogutils.c
Update CVS HEAD for 2007 copyright. Back branches are typically not
[postgresql] / src / backend / access / transam / xlogutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlogutils.c
4  *
5  * PostgreSQL transaction log manager utility routines
6  *
7  * This file contains support routines that are used by XLOG replay functions.
8  * None of this code is used during normal system operation.
9  *
10  *
11  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.49 2007/01/05 22:19:24 momjian Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/xlogutils.h"
21 #include "storage/bufpage.h"
22 #include "storage/smgr.h"
23 #include "utils/hsearch.h"
24
25
26 /*
27  * During XLOG replay, we may see XLOG records for incremental updates of
28  * pages that no longer exist, because their relation was later dropped or
29  * truncated.  (Note: this is only possible when full_page_writes = OFF,
30  * since when it's ON, the first reference we see to a page should always
31  * be a full-page rewrite not an incremental update.)  Rather than simply
32  * ignoring such records, we make a note of the referenced page, and then
33  * complain if we don't actually see a drop or truncate covering the page
34  * later in replay.
35  */
36 typedef struct xl_invalid_page_key
37 {
38         RelFileNode node;                       /* the relation */
39         BlockNumber blkno;                      /* the page */
40 } xl_invalid_page_key;
41
42 typedef struct xl_invalid_page
43 {
44         xl_invalid_page_key key;        /* hash key ... must be first */
45         bool            present;                /* page existed but contained zeroes */
46 } xl_invalid_page;
47
48 static HTAB *invalid_page_tab = NULL;
49
50
51 /* Log a reference to an invalid page */
52 static void
53 log_invalid_page(RelFileNode node, BlockNumber blkno, bool present)
54 {
55         xl_invalid_page_key key;
56         xl_invalid_page *hentry;
57         bool            found;
58
59         /*
60          * Log references to invalid pages at DEBUG1 level.  This allows some
61          * tracing of the cause (note the elog context mechanism will tell us
62          * something about the XLOG record that generated the reference).
63          */
64         if (present)
65                 elog(DEBUG1, "page %u of relation %u/%u/%u is uninitialized",
66                          blkno, node.spcNode, node.dbNode, node.relNode);
67         else
68                 elog(DEBUG1, "page %u of relation %u/%u/%u does not exist",
69                          blkno, node.spcNode, node.dbNode, node.relNode);
70
71         if (invalid_page_tab == NULL)
72         {
73                 /* create hash table when first needed */
74                 HASHCTL         ctl;
75
76                 memset(&ctl, 0, sizeof(ctl));
77                 ctl.keysize = sizeof(xl_invalid_page_key);
78                 ctl.entrysize = sizeof(xl_invalid_page);
79                 ctl.hash = tag_hash;
80
81                 invalid_page_tab = hash_create("XLOG invalid-page table",
82                                                                            100,
83                                                                            &ctl,
84                                                                            HASH_ELEM | HASH_FUNCTION);
85         }
86
87         /* we currently assume xl_invalid_page_key contains no padding */
88         key.node = node;
89         key.blkno = blkno;
90         hentry = (xl_invalid_page *)
91                 hash_search(invalid_page_tab, (void *) &key, HASH_ENTER, &found);
92
93         if (!found)
94         {
95                 /* hash_search already filled in the key */
96                 hentry->present = present;
97         }
98         else
99         {
100                 /* repeat reference ... leave "present" as it was */
101         }
102 }
103
104 /* Forget any invalid pages >= minblkno, because they've been dropped */
105 static void
106 forget_invalid_pages(RelFileNode node, BlockNumber minblkno)
107 {
108         HASH_SEQ_STATUS status;
109         xl_invalid_page *hentry;
110
111         if (invalid_page_tab == NULL)
112                 return;                                 /* nothing to do */
113
114         hash_seq_init(&status, invalid_page_tab);
115
116         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
117         {
118                 if (RelFileNodeEquals(hentry->key.node, node) &&
119                         hentry->key.blkno >= minblkno)
120                 {
121                         elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
122                                  hentry->key.blkno, hentry->key.node.spcNode,
123                                  hentry->key.node.dbNode, hentry->key.node.relNode);
124
125                         if (hash_search(invalid_page_tab,
126                                                         (void *) &hentry->key,
127                                                         HASH_REMOVE, NULL) == NULL)
128                                 elog(ERROR, "hash table corrupted");
129                 }
130         }
131 }
132
133 /* Forget any invalid pages in a whole database */
134 static void
135 forget_invalid_pages_db(Oid dbid)
136 {
137         HASH_SEQ_STATUS status;
138         xl_invalid_page *hentry;
139
140         if (invalid_page_tab == NULL)
141                 return;                                 /* nothing to do */
142
143         hash_seq_init(&status, invalid_page_tab);
144
145         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
146         {
147                 if (hentry->key.node.dbNode == dbid)
148                 {
149                         elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
150                                  hentry->key.blkno, hentry->key.node.spcNode,
151                                  hentry->key.node.dbNode, hentry->key.node.relNode);
152
153                         if (hash_search(invalid_page_tab,
154                                                         (void *) &hentry->key,
155                                                         HASH_REMOVE, NULL) == NULL)
156                                 elog(ERROR, "hash table corrupted");
157                 }
158         }
159 }
160
161 /* Complain about any remaining invalid-page entries */
162 void
163 XLogCheckInvalidPages(void)
164 {
165         HASH_SEQ_STATUS status;
166         xl_invalid_page *hentry;
167         bool            foundone = false;
168
169         if (invalid_page_tab == NULL)
170                 return;                                 /* nothing to do */
171
172         hash_seq_init(&status, invalid_page_tab);
173
174         /*
175          * Our strategy is to emit WARNING messages for all remaining entries and
176          * only PANIC after we've dumped all the available info.
177          */
178         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
179         {
180                 if (hentry->present)
181                         elog(WARNING, "page %u of relation %u/%u/%u was uninitialized",
182                                  hentry->key.blkno, hentry->key.node.spcNode,
183                                  hentry->key.node.dbNode, hentry->key.node.relNode);
184                 else
185                         elog(WARNING, "page %u of relation %u/%u/%u did not exist",
186                                  hentry->key.blkno, hentry->key.node.spcNode,
187                                  hentry->key.node.dbNode, hentry->key.node.relNode);
188                 foundone = true;
189         }
190
191         if (foundone)
192                 elog(PANIC, "WAL contains references to invalid pages");
193 }
194
195
196 /*
197  * XLogReadBuffer
198  *              Read a page during XLOG replay
199  *
200  * This is functionally comparable to ReadBuffer followed by
201  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
202  * and locked buffer.  (Getting the lock is not really necessary, since we
203  * expect that this is only used during single-process XLOG replay, but
204  * some subroutines such as MarkBufferDirty will complain if we don't.)
205  *
206  * If "init" is true then the caller intends to rewrite the page fully
207  * using the info in the XLOG record.  In this case we will extend the
208  * relation if needed to make the page exist, and we will not complain about
209  * the page being "new" (all zeroes).
210  *
211  * If "init" is false then the caller needs the page to be valid already.
212  * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
213  * In this case the caller should silently skip the update on this page.
214  * (In this situation, we expect that the page was later dropped or truncated.
215  * If we don't see evidence of that later in the WAL sequence, we'll complain
216  * at the end of WAL replay.)
217  */
218 Buffer
219 XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
220 {
221         BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
222         Buffer          buffer;
223
224         Assert(blkno != P_NEW);
225
226         if (blkno < lastblock)
227         {
228                 /* page exists in file */
229                 buffer = ReadBuffer(reln, blkno);
230         }
231         else
232         {
233                 /* hm, page doesn't exist in file */
234                 if (!init)
235                 {
236                         log_invalid_page(reln->rd_node, blkno, false);
237                         return InvalidBuffer;
238                 }
239                 /* OK to extend the file */
240                 /* we do this in recovery only - no rel-extension lock needed */
241                 Assert(InRecovery);
242                 buffer = InvalidBuffer;
243                 while (blkno >= lastblock)
244                 {
245                         if (buffer != InvalidBuffer)
246                                 ReleaseBuffer(buffer);
247                         buffer = ReadBuffer(reln, P_NEW);
248                         lastblock++;
249                 }
250                 Assert(BufferGetBlockNumber(buffer) == blkno);
251         }
252
253         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
254
255         if (!init)
256         {
257                 /* check that page has been initialized */
258                 Page            page = (Page) BufferGetPage(buffer);
259
260                 if (PageIsNew((PageHeader) page))
261                 {
262                         UnlockReleaseBuffer(buffer);
263                         log_invalid_page(reln->rd_node, blkno, true);
264                         return InvalidBuffer;
265                 }
266         }
267
268         return buffer;
269 }
270
271
272 /*
273  * Lightweight "Relation" cache --- this substitutes for the normal relcache
274  * during XLOG replay.
275  */
276
277 typedef struct XLogRelDesc
278 {
279         RelationData reldata;
280         struct XLogRelDesc *lessRecently;
281         struct XLogRelDesc *moreRecently;
282 } XLogRelDesc;
283
284 typedef struct XLogRelCacheEntry
285 {
286         RelFileNode rnode;
287         XLogRelDesc *rdesc;
288 } XLogRelCacheEntry;
289
290 static HTAB *_xlrelcache;
291 static XLogRelDesc *_xlrelarr = NULL;
292 static Form_pg_class _xlpgcarr = NULL;
293 static int      _xlast = 0;
294 static int      _xlcnt = 0;
295
296 #define _XLOG_RELCACHESIZE      512
297
298 static void
299 _xl_init_rel_cache(void)
300 {
301         HASHCTL         ctl;
302
303         _xlcnt = _XLOG_RELCACHESIZE;
304         _xlast = 0;
305         _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
306         memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
307         _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
308         memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
309
310         _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
311         _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
312
313         memset(&ctl, 0, sizeof(ctl));
314         ctl.keysize = sizeof(RelFileNode);
315         ctl.entrysize = sizeof(XLogRelCacheEntry);
316         ctl.hash = tag_hash;
317
318         _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
319                                                           &ctl, HASH_ELEM | HASH_FUNCTION);
320 }
321
322 static void
323 _xl_remove_hash_entry(XLogRelDesc *rdesc)
324 {
325         Form_pg_class tpgc = rdesc->reldata.rd_rel;
326         XLogRelCacheEntry *hentry;
327
328         rdesc->lessRecently->moreRecently = rdesc->moreRecently;
329         rdesc->moreRecently->lessRecently = rdesc->lessRecently;
330
331         hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
332                                           (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
333         if (hentry == NULL)
334                 elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
335
336         RelationCloseSmgr(&(rdesc->reldata));
337
338         memset(rdesc, 0, sizeof(XLogRelDesc));
339         memset(tpgc, 0, sizeof(FormData_pg_class));
340         rdesc->reldata.rd_rel = tpgc;
341 }
342
343 static XLogRelDesc *
344 _xl_new_reldesc(void)
345 {
346         XLogRelDesc *res;
347
348         _xlast++;
349         if (_xlast < _xlcnt)
350         {
351                 _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
352                 return &(_xlrelarr[_xlast]);
353         }
354
355         /* reuse */
356         res = _xlrelarr[0].moreRecently;
357
358         _xl_remove_hash_entry(res);
359
360         _xlast--;
361         return res;
362 }
363
364
365 void
366 XLogInitRelationCache(void)
367 {
368         _xl_init_rel_cache();
369         invalid_page_tab = NULL;
370 }
371
372 void
373 XLogCloseRelationCache(void)
374 {
375         HASH_SEQ_STATUS status;
376         XLogRelCacheEntry *hentry;
377
378         if (!_xlrelarr)
379                 return;
380
381         hash_seq_init(&status, _xlrelcache);
382
383         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
384                 _xl_remove_hash_entry(hentry->rdesc);
385
386         hash_destroy(_xlrelcache);
387
388         free(_xlrelarr);
389         free(_xlpgcarr);
390
391         _xlrelarr = NULL;
392 }
393
394 /*
395  * Open a relation during XLOG replay
396  *
397  * Note: this once had an API that allowed NULL return on failure, but it
398  * no longer does; any failure results in elog().
399  */
400 Relation
401 XLogOpenRelation(RelFileNode rnode)
402 {
403         XLogRelDesc *res;
404         XLogRelCacheEntry *hentry;
405         bool            found;
406
407         hentry = (XLogRelCacheEntry *)
408                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
409
410         if (hentry)
411         {
412                 res = hentry->rdesc;
413
414                 res->lessRecently->moreRecently = res->moreRecently;
415                 res->moreRecently->lessRecently = res->lessRecently;
416         }
417         else
418         {
419                 res = _xl_new_reldesc();
420
421                 sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
422
423                 res->reldata.rd_node = rnode;
424
425                 /*
426                  * We set up the lockRelId in case anything tries to lock the dummy
427                  * relation.  Note that this is fairly bogus since relNode may be
428                  * different from the relation's OID.  It shouldn't really matter
429                  * though, since we are presumably running by ourselves and can't have
430                  * any lock conflicts ...
431                  */
432                 res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
433                 res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
434
435                 hentry = (XLogRelCacheEntry *)
436                         hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
437
438                 if (found)
439                         elog(PANIC, "xlog relation already present on insert into cache");
440
441                 hentry->rdesc = res;
442
443                 res->reldata.rd_targblock = InvalidBlockNumber;
444                 res->reldata.rd_smgr = NULL;
445                 RelationOpenSmgr(&(res->reldata));
446
447                 /*
448                  * Create the target file if it doesn't already exist.  This lets us
449                  * cope if the replay sequence contains writes to a relation that is
450                  * later deleted.  (The original coding of this routine would instead
451                  * return NULL, causing the writes to be suppressed. But that seems
452                  * like it risks losing valuable data if the filesystem loses an inode
453                  * during a crash.      Better to write the data until we are actually
454                  * told to delete the file.)
455                  */
456                 smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
457         }
458
459         res->moreRecently = &(_xlrelarr[0]);
460         res->lessRecently = _xlrelarr[0].lessRecently;
461         _xlrelarr[0].lessRecently = res;
462         res->lessRecently->moreRecently = res;
463
464         return &(res->reldata);
465 }
466
467 /*
468  * Drop a relation during XLOG replay
469  *
470  * This is called when the relation is about to be deleted; we need to ensure
471  * that there is no dangling smgr reference in the xlog relation cache.
472  *
473  * Currently, we don't bother to physically remove the relation from the
474  * cache, we just let it age out normally.
475  *
476  * This also takes care of removing any open "invalid-page" records for
477  * the relation.
478  */
479 void
480 XLogDropRelation(RelFileNode rnode)
481 {
482         XLogRelCacheEntry *hentry;
483
484         hentry = (XLogRelCacheEntry *)
485                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
486
487         if (hentry)
488         {
489                 XLogRelDesc *rdesc = hentry->rdesc;
490
491                 RelationCloseSmgr(&(rdesc->reldata));
492         }
493
494         forget_invalid_pages(rnode, 0);
495 }
496
497 /*
498  * Drop a whole database during XLOG replay
499  *
500  * As above, but for DROP DATABASE instead of dropping a single rel
501  */
502 void
503 XLogDropDatabase(Oid dbid)
504 {
505         HASH_SEQ_STATUS status;
506         XLogRelCacheEntry *hentry;
507
508         hash_seq_init(&status, _xlrelcache);
509
510         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
511         {
512                 XLogRelDesc *rdesc = hentry->rdesc;
513
514                 if (hentry->rnode.dbNode == dbid)
515                         RelationCloseSmgr(&(rdesc->reldata));
516         }
517
518         forget_invalid_pages_db(dbid);
519 }
520
521 /*
522  * Truncate a relation during XLOG replay
523  *
524  * We don't need to do anything to the fake relcache, but we do need to
525  * clean up any open "invalid-page" records for the dropped pages.
526  */
527 void
528 XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
529 {
530         forget_invalid_pages(rnode, nblocks);
531 }