* storage.c
* code to create and destroy physical storage for relations
*
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/storage.c,v 1.8 2010/02/07 20:48:09 tgl Exp $
+ * src/backend/catalog/storage.c
*
* NOTES
* Some of this code used to be in storage/smgr/smgr.c, and the
#include "access/visibilitymap.h"
#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
#include "storage/freespace.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
* that have been created or deleted in the current transaction. When
* a relation is created, we create the physical file immediately, but
* remember it so that we can delete the file again if the current
- * transaction is aborted. Conversely, a deletion request is NOT
+ * transaction is aborted. Conversely, a deletion request is NOT
* executed immediately, but is just entered in the list. When and if
* the transaction commits, we can delete the physical file.
*
typedef struct PendingRelDelete
{
RelFileNode relnode; /* relation that may need to be deleted */
- bool isTemp; /* is it a temporary relation? */
+ BackendId backend; /* InvalidBackendId if not a temp rel */
bool atCommit; /* T=delete at commit; F=delete at abort */
int nestLevel; /* xact nesting level of request */
struct PendingRelDelete *next; /* linked-list link */
static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
-/*
- * Declarations for smgr-related XLOG records
- *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
- */
-
-/* XLOG gives us high 4 bits */
-#define XLOG_SMGR_CREATE 0x10
-#define XLOG_SMGR_TRUNCATE 0x20
-
-typedef struct xl_smgr_create
-{
- RelFileNode rnode;
-} xl_smgr_create;
-
-typedef struct xl_smgr_truncate
-{
- BlockNumber blkno;
- RelFileNode rnode;
-} xl_smgr_truncate;
-
-
/*
* RelationCreateStorage
* Create physical storage for a relation.
* transaction aborts later on, the storage will be destroyed.
*/
void
-RelationCreateStorage(RelFileNode rnode, bool istemp)
+RelationCreateStorage(RelFileNode rnode, char relpersistence)
{
PendingRelDelete *pending;
- XLogRecPtr lsn;
- XLogRecData rdata;
- xl_smgr_create xlrec;
SMgrRelation srel;
+ BackendId backend;
+ bool needs_wal;
- srel = smgropen(rnode);
- smgrcreate(srel, MAIN_FORKNUM, false);
-
- if (!istemp)
+ switch (relpersistence)
{
- /*
- * Make an XLOG entry reporting the file creation.
- */
- xlrec.rnode = rnode;
+ case RELPERSISTENCE_TEMP:
+ backend = MyBackendId;
+ needs_wal = false;
+ break;
+ case RELPERSISTENCE_UNLOGGED:
+ backend = InvalidBackendId;
+ needs_wal = false;
+ break;
+ case RELPERSISTENCE_PERMANENT:
+ backend = InvalidBackendId;
+ needs_wal = true;
+ break;
+ default:
+ elog(ERROR, "invalid relpersistence: %c", relpersistence);
+ return; /* placate compiler */
+ }
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xlrec);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ srel = smgropen(rnode, backend);
+ smgrcreate(srel, MAIN_FORKNUM, false);
- lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE, &rdata);
- }
+ if (needs_wal)
+ log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
/* Add the relation to the list of stuff to delete at abort */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = rnode;
- pending->isTemp = istemp;
+ pending->backend = backend;
pending->atCommit = false; /* delete if abort */
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->next = pendingDeletes;
pendingDeletes = pending;
}
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
+ */
+void
+log_smgrcreate(RelFileNode *rnode, ForkNumber forkNum)
+{
+ xl_smgr_create xlrec;
+
+ /*
+ * Make an XLOG entry reporting the file creation.
+ */
+ xlrec.rnode = *rnode;
+ xlrec.forkNum = forkNum;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+ XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
+}
+
/*
* RelationDropStorage
* Schedule unlinking of physical storage at transaction commit.
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = rel->rd_node;
- pending->isTemp = rel->rd_istemp;
+ pending->backend = rel->rd_backend;
pending->atCommit = true; /* delete if commit */
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->next = pendingDeletes;
* The relation mapper fixes this by telling us to not delete such relations
* after all as part of its commit.
*
+ * We also use this to reuse an old build of an index during ALTER TABLE, this
+ * time removing the delete-at-commit entry.
+ *
* No-op if the relation is not among those scheduled for deletion.
*/
void
-RelationPreserveStorage(RelFileNode rnode)
+RelationPreserveStorage(RelFileNode rnode, bool atCommit)
{
PendingRelDelete *pending;
PendingRelDelete *prev;
for (pending = pendingDeletes; pending != NULL; pending = next)
{
next = pending->next;
- if (RelFileNodeEquals(rnode, pending->relnode))
+ if (RelFileNodeEquals(rnode, pending->relnode)
+ && pending->atCommit == atCommit)
{
- /* we should only find delete-on-abort entries, else trouble */
- if (pending->atCommit)
- elog(ERROR, "cannot preserve a delete-on-commit relation");
/* unlink and delete list entry */
if (prev)
prev->next = next;
* Physically truncate a relation to the specified number of blocks.
*
* This includes getting rid of any buffers for the blocks that are to be
- * dropped. If 'fsm' is true, the FSM of the relation is truncated as well.
+ * dropped.
*/
void
RelationTruncate(Relation rel, BlockNumber nblocks)
/* Open it at the smgr level if not already done */
RelationOpenSmgr(rel);
- /* Make sure rd_targblock isn't pointing somewhere past end */
- rel->rd_targblock = InvalidBlockNumber;
+ /*
+ * Make sure smgr_targblock etc aren't pointing somewhere past new end
+ */
+ rel->rd_smgr->smgr_targblock = InvalidBlockNumber;
+ rel->rd_smgr->smgr_fsm_nblocks = InvalidBlockNumber;
+ rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
/* Truncate the FSM first if it exists */
fsm = smgrexists(rel->rd_smgr, FSM_FORKNUM);
* failure to truncate, that might spell trouble at WAL replay, into a
* certain PANIC.
*/
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
/*
* Make an XLOG entry reporting the file truncation.
*/
XLogRecPtr lsn;
- XLogRecData rdata;
xl_smgr_truncate xlrec;
xlrec.blkno = nblocks;
xlrec.rnode = rel->rd_node;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xlrec);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
- lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE, &rdata);
+ lsn = XLogInsert(RM_SMGR_ID,
+ XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
/*
* Flush, because otherwise the truncation of the main relation might
}
/* Do the real work */
- smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks, rel->rd_istemp);
+ smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks);
}
/*
*
* This also runs when aborting a subxact; we want to clean up a failed
* subxact immediately.
+ *
+ * Note: It's possible that we're being asked to remove a relation that has
+ * no physical storage in any fork. In particular, it's possible that we're
+ * cleaning up an old temporary relation for which RemovePgTempFiles has
+ * already recovered the physical storage.
*/
void
smgrDoPendingDeletes(bool isCommit)
PendingRelDelete *pending;
PendingRelDelete *prev;
PendingRelDelete *next;
+ int nrels = 0,
+ i = 0,
+ maxrels = 0;
+ SMgrRelation *srels = NULL;
prev = NULL;
for (pending = pendingDeletes; pending != NULL; pending = next)
if (pending->atCommit == isCommit)
{
SMgrRelation srel;
- int i;
- srel = smgropen(pending->relnode);
- for (i = 0; i <= MAX_FORKNUM; i++)
+ srel = smgropen(pending->relnode, pending->backend);
+
+ /* allocate the initial array, or extend it, if needed */
+ if (maxrels == 0)
+ {
+ maxrels = 8;
+ srels = palloc(sizeof(SMgrRelation) * maxrels);
+ }
+ else if (maxrels <= nrels)
{
- if (smgrexists(srel, i))
- smgrdounlink(srel,
- i,
- pending->isTemp,
- false);
+ maxrels *= 2;
+ srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
}
- smgrclose(srel);
+
+ srels[nrels++] = srel;
}
/* must explicitly free the list entry */
pfree(pending);
/* prev does not change */
}
}
+
+ if (nrels > 0)
+ {
+ smgrdounlinkall(srels, nrels, false);
+
+ for (i = 0; i < nrels; i++)
+ smgrclose(srels[i]);
+
+ pfree(srels);
+ }
}
/*
- * smgrGetPendingDeletes() -- Get a list of relations to be deleted.
+ * smgrGetPendingDeletes() -- Get a list of non-temp relations to be deleted.
*
* The return value is the number of relations scheduled for termination.
* *ptr is set to point to a freshly-palloc'd array of RelFileNodes.
* If there are no relations to be deleted, *ptr is set to NULL.
*
- * If haveNonTemp isn't NULL, the bool it points to gets set to true if
- * there is any non-temp table pending to be deleted; false if not.
+ * Only non-temporary relations are included in the returned list. This is OK
+ * because the list is used only in contexts where temporary relations don't
+ * matter: we're either writing to the two-phase state file (and transactions
+ * that have touched temp tables can't be prepared) or we're writing to xlog
+ * (and all temporary files will be zapped if we restart anyway, so no need
+ * for redo to do it also).
*
* Note that the list does not include anything scheduled for termination
* by upper-level transactions.
*/
int
-smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr, bool *haveNonTemp)
+smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
{
int nestLevel = GetCurrentTransactionNestLevel();
int nrels;
PendingRelDelete *pending;
nrels = 0;
- if (haveNonTemp)
- *haveNonTemp = false;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
- if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
+ if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
+ && pending->backend == InvalidBackendId)
nrels++;
}
if (nrels == 0)
*ptr = rptr;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
- if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
+ if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
+ && pending->backend == InvalidBackendId)
{
*rptr = pending->relnode;
rptr++;
}
- if (haveNonTemp && !pending->isTemp)
- *haveNonTemp = true;
}
return nrels;
}
}
void
-smgr_redo(XLogRecPtr lsn, XLogRecord *record)
+smgr_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ XLogRecPtr lsn = record->EndRecPtr;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/* Backup blocks are not used in smgr records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
if (info == XLOG_SMGR_CREATE)
{
xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(record);
SMgrRelation reln;
- reln = smgropen(xlrec->rnode);
- smgrcreate(reln, MAIN_FORKNUM, true);
+ reln = smgropen(xlrec->rnode, InvalidBackendId);
+ smgrcreate(reln, xlrec->forkNum, true);
}
else if (info == XLOG_SMGR_TRUNCATE)
{
xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
SMgrRelation reln;
+ Relation rel;
- reln = smgropen(xlrec->rnode);
+ reln = smgropen(xlrec->rnode, InvalidBackendId);
/*
* Forcibly create relation if it doesn't exist (which suggests that
* it was dropped somewhere later in the WAL sequence). As in
- * XLogOpenRelation, we prefer to recreate the rel and replay the log
- * as best we can until the drop is seen.
+ * XLogReadBufferForRedo, we prefer to recreate the rel and replay the
+ * log as best we can until the drop is seen.
*/
smgrcreate(reln, MAIN_FORKNUM, true);
- smgrtruncate(reln, MAIN_FORKNUM, xlrec->blkno, false);
+ /*
+ * Before we perform the truncation, update minimum recovery point to
+ * cover this WAL record. Once the relation is truncated, there's no
+ * going back. The buffer manager enforces the WAL-first rule for
+ * normal updates to relation files, so that the minimum recovery
+ * point is always updated before the corresponding change in the data
+ * file is flushed to disk. We have to do the same manually here.
+ *
+ * Doing this before the truncation means that if the truncation fails
+ * for some reason, you cannot start up the system even after restart,
+ * until you fix the underlying situation so that the truncation will
+ * succeed. Alternatively, we could update the minimum recovery point
+ * after truncation, but that would leave a small window where the
+ * WAL-first rule could be violated.
+ */
+ XLogFlush(lsn);
+
+ smgrtruncate(reln, MAIN_FORKNUM, xlrec->blkno);
/* Also tell xlogutils.c about it */
XLogTruncateRelation(xlrec->rnode, MAIN_FORKNUM, xlrec->blkno);
- /* Truncate FSM too */
- if (smgrexists(reln, FSM_FORKNUM))
- {
- Relation rel = CreateFakeRelcacheEntry(xlrec->rnode);
+ /* Truncate FSM and VM too */
+ rel = CreateFakeRelcacheEntry(xlrec->rnode);
+ if (smgrexists(reln, FSM_FORKNUM))
FreeSpaceMapTruncateRel(rel, xlrec->blkno);
- FreeFakeRelcacheEntry(rel);
- }
- }
- else
- elog(PANIC, "smgr_redo: unknown op code %u", info);
-}
-
-void
-smgr_desc(StringInfo buf, uint8 xl_info, char *rec)
-{
- uint8 info = xl_info & ~XLR_INFO_MASK;
+ if (smgrexists(reln, VISIBILITYMAP_FORKNUM))
+ visibilitymap_truncate(rel, xlrec->blkno);
- if (info == XLOG_SMGR_CREATE)
- {
- xl_smgr_create *xlrec = (xl_smgr_create *) rec;
- char *path = relpath(xlrec->rnode, MAIN_FORKNUM);
-
- appendStringInfo(buf, "file create: %s", path);
- pfree(path);
- }
- else if (info == XLOG_SMGR_TRUNCATE)
- {
- xl_smgr_truncate *xlrec = (xl_smgr_truncate *) rec;
- char *path = relpath(xlrec->rnode, MAIN_FORKNUM);
-
- appendStringInfo(buf, "file truncate: %s to %u blocks", path,
- xlrec->blkno);
- pfree(path);
+ FreeFakeRelcacheEntry(rel);
}
else
- appendStringInfo(buf, "UNKNOWN");
+ elog(PANIC, "smgr_redo: unknown op code %u", info);
}