]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlogutils.c
Make the world safe for full_page_writes. Allow XLOG records that try to
[postgresql] / src / backend / access / transam / xlogutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlogutils.c
4  *
5  * PostgreSQL transaction log manager utility routines
6  *
7  * This file contains support routines that are used by XLOG replay functions.
8  * None of this code is used during normal system operation.
9  *
10  *
11  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.44 2006/04/14 20:27:24 tgl Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/xlogutils.h"
21 #include "storage/bufmgr.h"
22 #include "storage/bufpage.h"
23 #include "storage/smgr.h"
24 #include "utils/hsearch.h"
25
26
27 /*
28  * During XLOG replay, we may see XLOG records for incremental updates of
29  * pages that no longer exist, because their relation was later dropped or
30  * truncated.  (Note: this is only possible when full_page_writes = OFF,
31  * since when it's ON, the first reference we see to a page should always
32  * be a full-page rewrite not an incremental update.)  Rather than simply
33  * ignoring such records, we make a note of the referenced page, and then
34  * complain if we don't actually see a drop or truncate covering the page
35  * later in replay.
36  */
37 typedef struct xl_invalid_page_key
38 {
39         RelFileNode node;                       /* the relation */
40         BlockNumber blkno;                      /* the page */
41 } xl_invalid_page_key;
42
43 typedef struct xl_invalid_page
44 {
45         xl_invalid_page_key key;        /* hash key ... must be first */
46         bool            present;                /* page existed but contained zeroes */
47 } xl_invalid_page;
48
49 static HTAB *invalid_page_tab = NULL;
50
51
52 /* Log a reference to an invalid page */
53 static void
54 log_invalid_page(RelFileNode node, BlockNumber blkno, bool present)
55 {
56         xl_invalid_page_key key;
57         xl_invalid_page *hentry;
58         bool            found;
59
60         /*
61          * Log references to invalid pages at DEBUG1 level.  This allows some
62          * tracing of the cause (note the elog context mechanism will tell us
63          * something about the XLOG record that generated the reference).
64          */
65         if (present)
66                 elog(DEBUG1, "page %u of relation %u/%u/%u is uninitialized",
67                          blkno, node.spcNode, node.dbNode, node.relNode);
68         else
69                 elog(DEBUG1, "page %u of relation %u/%u/%u does not exist",
70                          blkno, node.spcNode, node.dbNode, node.relNode);
71
72         if (invalid_page_tab == NULL)
73         {
74                 /* create hash table when first needed */
75                 HASHCTL         ctl;
76
77                 memset(&ctl, 0, sizeof(ctl));
78                 ctl.keysize = sizeof(xl_invalid_page_key);
79                 ctl.entrysize = sizeof(xl_invalid_page);
80                 ctl.hash = tag_hash;
81
82                 invalid_page_tab = hash_create("XLOG invalid-page table",
83                                                                            100,
84                                                                            &ctl,
85                                                                            HASH_ELEM | HASH_FUNCTION);
86         }
87
88         /* we currently assume xl_invalid_page_key contains no padding */
89         key.node = node;
90         key.blkno = blkno;
91         hentry = (xl_invalid_page *)
92                 hash_search(invalid_page_tab, (void *) &key, HASH_ENTER, &found);
93
94         if (!found)
95         {
96                 /* hash_search already filled in the key */
97                 hentry->present = present;
98         }
99         else
100         {
101                 /* repeat reference ... leave "present" as it was */
102         }
103 }
104
105 /* Forget any invalid pages >= minblkno, because they've been dropped */
106 static void
107 forget_invalid_pages(RelFileNode node, BlockNumber minblkno)
108 {
109         HASH_SEQ_STATUS status;
110         xl_invalid_page *hentry;
111
112         if (invalid_page_tab == NULL)
113                 return;                                 /* nothing to do */
114
115         hash_seq_init(&status, invalid_page_tab);
116
117         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
118         {
119                 if (RelFileNodeEquals(hentry->key.node, node) &&
120                         hentry->key.blkno >= minblkno)
121                 {
122                         elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
123                                  hentry->key.blkno, hentry->key.node.spcNode,
124                                  hentry->key.node.dbNode, hentry->key.node.relNode);
125
126                         if (hash_search(invalid_page_tab,
127                                                         (void *) &hentry->key,
128                                                         HASH_REMOVE, NULL) == NULL)
129                                 elog(ERROR, "hash table corrupted");
130                 }
131         }
132 }
133
134 /* Forget any invalid pages in a whole database */
135 static void
136 forget_invalid_pages_db(Oid dbid)
137 {
138         HASH_SEQ_STATUS status;
139         xl_invalid_page *hentry;
140
141         if (invalid_page_tab == NULL)
142                 return;                                 /* nothing to do */
143
144         hash_seq_init(&status, invalid_page_tab);
145
146         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
147         {
148                 if (hentry->key.node.dbNode == dbid)
149                 {
150                         elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
151                                  hentry->key.blkno, hentry->key.node.spcNode,
152                                  hentry->key.node.dbNode, hentry->key.node.relNode);
153
154                         if (hash_search(invalid_page_tab,
155                                                         (void *) &hentry->key,
156                                                         HASH_REMOVE, NULL) == NULL)
157                                 elog(ERROR, "hash table corrupted");
158                 }
159         }
160 }
161
162 /* Complain about any remaining invalid-page entries */
163 void
164 XLogCheckInvalidPages(void)
165 {
166         HASH_SEQ_STATUS status;
167         xl_invalid_page *hentry;
168         bool            foundone = false;
169
170         if (invalid_page_tab == NULL)
171                 return;                                 /* nothing to do */
172
173         hash_seq_init(&status, invalid_page_tab);
174
175         /*
176          * Our strategy is to emit WARNING messages for all remaining entries
177          * and only PANIC after we've dumped all the available info.
178          */
179         while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
180         {
181                 if (hentry->present)
182                         elog(WARNING, "page %u of relation %u/%u/%u was uninitialized",
183                                  hentry->key.blkno, hentry->key.node.spcNode,
184                                  hentry->key.node.dbNode, hentry->key.node.relNode);
185                 else
186                         elog(WARNING, "page %u of relation %u/%u/%u did not exist",
187                                  hentry->key.blkno, hentry->key.node.spcNode,
188                                  hentry->key.node.dbNode, hentry->key.node.relNode);
189                 foundone = true;
190         }
191
192         if (foundone)
193                 elog(PANIC, "WAL contains references to invalid pages");
194 }
195
196
197 /*
198  * XLogReadBuffer
199  *              Read a page during XLOG replay
200  *
201  * This is functionally comparable to ReadBuffer followed by
202  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
203  * and locked buffer.  (Getting the lock is not really necessary, since we
204  * expect that this is only used during single-process XLOG replay, but
205  * some subroutines such as MarkBufferDirty will complain if we don't.)
206  *
207  * If "init" is true then the caller intends to rewrite the page fully
208  * using the info in the XLOG record.  In this case we will extend the
209  * relation if needed to make the page exist, and we will not complain about
210  * the page being "new" (all zeroes).
211  *
212  * If "init" is false then the caller needs the page to be valid already.
213  * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
214  * In this case the caller should silently skip the update on this page.
215  * (In this situation, we expect that the page was later dropped or truncated.
216  * If we don't see evidence of that later in the WAL sequence, we'll complain
217  * at the end of WAL replay.)
218  */
219 Buffer
220 XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
221 {
222         BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
223         Buffer          buffer;
224
225         Assert(blkno != P_NEW);
226
227         if (blkno < lastblock)
228         {
229                 /* page exists in file */
230                 buffer = ReadBuffer(reln, blkno);
231         }
232         else
233         {
234                 /* hm, page doesn't exist in file */
235                 if (!init)
236                 {
237                         log_invalid_page(reln->rd_node, blkno, false);
238                         return InvalidBuffer;
239                 }
240                 /* OK to extend the file */
241                 /* we do this in recovery only - no rel-extension lock needed */
242                 Assert(InRecovery);
243                 buffer = InvalidBuffer;
244                 while (blkno >= lastblock)
245                 {
246                         if (buffer != InvalidBuffer)
247                                 ReleaseBuffer(buffer);
248                         buffer = ReadBuffer(reln, P_NEW);
249                         lastblock++;
250                 }
251                 Assert(BufferGetBlockNumber(buffer) == blkno);
252         }
253
254         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
255
256         if (!init)
257         {
258                 /* check that page has been initialized */
259                 Page    page = (Page) BufferGetPage(buffer);
260
261                 if (PageIsNew((PageHeader) page))
262                 {
263                         UnlockReleaseBuffer(buffer);
264                         log_invalid_page(reln->rd_node, blkno, true);
265                         return InvalidBuffer;
266                 }
267         }
268
269         return buffer;
270 }
271
272
273 /*
274  * Lightweight "Relation" cache --- this substitutes for the normal relcache
275  * during XLOG replay.
276  */
277
278 typedef struct XLogRelDesc
279 {
280         RelationData reldata;
281         struct XLogRelDesc *lessRecently;
282         struct XLogRelDesc *moreRecently;
283 } XLogRelDesc;
284
285 typedef struct XLogRelCacheEntry
286 {
287         RelFileNode rnode;
288         XLogRelDesc *rdesc;
289 } XLogRelCacheEntry;
290
291 static HTAB *_xlrelcache;
292 static XLogRelDesc *_xlrelarr = NULL;
293 static Form_pg_class _xlpgcarr = NULL;
294 static int      _xlast = 0;
295 static int      _xlcnt = 0;
296
297 #define _XLOG_RELCACHESIZE      512
298
299 static void
300 _xl_init_rel_cache(void)
301 {
302         HASHCTL         ctl;
303
304         _xlcnt = _XLOG_RELCACHESIZE;
305         _xlast = 0;
306         _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
307         memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
308         _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
309         memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
310
311         _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
312         _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
313
314         memset(&ctl, 0, sizeof(ctl));
315         ctl.keysize = sizeof(RelFileNode);
316         ctl.entrysize = sizeof(XLogRelCacheEntry);
317         ctl.hash = tag_hash;
318
319         _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
320                                                           &ctl, HASH_ELEM | HASH_FUNCTION);
321 }
322
323 static void
324 _xl_remove_hash_entry(XLogRelDesc *rdesc)
325 {
326         Form_pg_class tpgc = rdesc->reldata.rd_rel;
327         XLogRelCacheEntry *hentry;
328
329         rdesc->lessRecently->moreRecently = rdesc->moreRecently;
330         rdesc->moreRecently->lessRecently = rdesc->lessRecently;
331
332         hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
333                                           (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
334         if (hentry == NULL)
335                 elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
336
337         RelationCloseSmgr(&(rdesc->reldata));
338
339         memset(rdesc, 0, sizeof(XLogRelDesc));
340         memset(tpgc, 0, sizeof(FormData_pg_class));
341         rdesc->reldata.rd_rel = tpgc;
342 }
343
344 static XLogRelDesc *
345 _xl_new_reldesc(void)
346 {
347         XLogRelDesc *res;
348
349         _xlast++;
350         if (_xlast < _xlcnt)
351         {
352                 _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
353                 return &(_xlrelarr[_xlast]);
354         }
355
356         /* reuse */
357         res = _xlrelarr[0].moreRecently;
358
359         _xl_remove_hash_entry(res);
360
361         _xlast--;
362         return res;
363 }
364
365
366 void
367 XLogInitRelationCache(void)
368 {
369         _xl_init_rel_cache();
370         invalid_page_tab = NULL;
371 }
372
373 void
374 XLogCloseRelationCache(void)
375 {
376         HASH_SEQ_STATUS status;
377         XLogRelCacheEntry *hentry;
378
379         if (!_xlrelarr)
380                 return;
381
382         hash_seq_init(&status, _xlrelcache);
383
384         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
385                 _xl_remove_hash_entry(hentry->rdesc);
386
387         hash_destroy(_xlrelcache);
388
389         free(_xlrelarr);
390         free(_xlpgcarr);
391
392         _xlrelarr = NULL;
393 }
394
395 /*
396  * Open a relation during XLOG replay
397  *
398  * Note: this once had an API that allowed NULL return on failure, but it
399  * no longer does; any failure results in elog().
400  */
401 Relation
402 XLogOpenRelation(RelFileNode rnode)
403 {
404         XLogRelDesc *res;
405         XLogRelCacheEntry *hentry;
406         bool            found;
407
408         hentry = (XLogRelCacheEntry *)
409                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
410
411         if (hentry)
412         {
413                 res = hentry->rdesc;
414
415                 res->lessRecently->moreRecently = res->moreRecently;
416                 res->moreRecently->lessRecently = res->lessRecently;
417         }
418         else
419         {
420                 res = _xl_new_reldesc();
421
422                 sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
423
424                 res->reldata.rd_node = rnode;
425
426                 /*
427                  * We set up the lockRelId in case anything tries to lock the dummy
428                  * relation.  Note that this is fairly bogus since relNode may be
429                  * different from the relation's OID.  It shouldn't really matter
430                  * though, since we are presumably running by ourselves and can't have
431                  * any lock conflicts ...
432                  */
433                 res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
434                 res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
435
436                 hentry = (XLogRelCacheEntry *)
437                         hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
438
439                 if (found)
440                         elog(PANIC, "xlog relation already present on insert into cache");
441
442                 hentry->rdesc = res;
443
444                 res->reldata.rd_targblock = InvalidBlockNumber;
445                 res->reldata.rd_smgr = NULL;
446                 RelationOpenSmgr(&(res->reldata));
447
448                 /*
449                  * Create the target file if it doesn't already exist.  This lets us
450                  * cope if the replay sequence contains writes to a relation that is
451                  * later deleted.  (The original coding of this routine would instead
452                  * return NULL, causing the writes to be suppressed. But that seems
453                  * like it risks losing valuable data if the filesystem loses an inode
454                  * during a crash.      Better to write the data until we are actually
455                  * told to delete the file.)
456                  */
457                 smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
458         }
459
460         res->moreRecently = &(_xlrelarr[0]);
461         res->lessRecently = _xlrelarr[0].lessRecently;
462         _xlrelarr[0].lessRecently = res;
463         res->lessRecently->moreRecently = res;
464
465         return &(res->reldata);
466 }
467
468 /*
469  * Drop a relation during XLOG replay
470  *
471  * This is called when the relation is about to be deleted; we need to ensure
472  * that there is no dangling smgr reference in the xlog relation cache.
473  *
474  * Currently, we don't bother to physically remove the relation from the
475  * cache, we just let it age out normally.
476  *
477  * This also takes care of removing any open "invalid-page" records for
478  * the relation.
479  */
480 void
481 XLogDropRelation(RelFileNode rnode)
482 {
483         XLogRelCacheEntry *hentry;
484
485         hentry = (XLogRelCacheEntry *)
486                 hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
487
488         if (hentry)
489         {
490                 XLogRelDesc *rdesc = hentry->rdesc;
491
492                 RelationCloseSmgr(&(rdesc->reldata));
493         }
494
495         forget_invalid_pages(rnode, 0);
496 }
497
498 /*
499  * Drop a whole database during XLOG replay
500  *
501  * As above, but for DROP DATABASE instead of dropping a single rel
502  */
503 void
504 XLogDropDatabase(Oid dbid)
505 {
506         HASH_SEQ_STATUS status;
507         XLogRelCacheEntry *hentry;
508
509         hash_seq_init(&status, _xlrelcache);
510
511         while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
512         {
513                 XLogRelDesc *rdesc = hentry->rdesc;
514
515                 if (hentry->rnode.dbNode == dbid)
516                         RelationCloseSmgr(&(rdesc->reldata));
517         }
518
519         forget_invalid_pages_db(dbid);
520 }
521
522 /*
523  * Truncate a relation during XLOG replay
524  *
525  * We don't need to do anything to the fake relcache, but we do need to
526  * clean up any open "invalid-page" records for the dropped pages.
527  */
528 void
529 XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
530 {
531         forget_invalid_pages(rnode, nblocks);
532 }