standby_desc_running_xacts(buf, xlrec);
}
+ else if (info == XLOG_INVALIDATIONS)
+ {
+ xl_invalidations *xlrec = (xl_invalidations *) rec;
+
+ standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+ xlrec->dbId, xlrec->tsId,
+ xlrec->relcacheInitFileInval);
+ }
}
const char *
case XLOG_RUNNING_XACTS:
id = "RUNNING_XACTS";
break;
+ case XLOG_INVALIDATIONS:
+ id = "INVALIDATIONS";
+ break;
}
return id;
}
+
+/*
+ * This routine is used by both standby_desc and xact_desc, because
+ * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
+ * it seems pointless to duplicate the code.
+ */
+void
+standby_desc_invalidations(StringInfo buf,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ Oid dbId, Oid tsId,
+ bool relcacheInitFileInval)
+{
+ int i;
+
+ if (relcacheInitFileInval)
+ appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+ dbId, tsId);
+
+ appendStringInfoString(buf, "; inval msgs:");
+ for (i = 0; i < nmsgs; i++)
+ {
+ SharedInvalidationMessage *msg = &msgs[i];
+
+ if (msg->id >= 0)
+ appendStringInfo(buf, " catcache %d", msg->id);
+ else if (msg->id == SHAREDINVALCATALOG_ID)
+ appendStringInfo(buf, " catalog %u", msg->cat.catId);
+ else if (msg->id == SHAREDINVALRELCACHE_ID)
+ appendStringInfo(buf, " relcache %u", msg->rc.relId);
+ /* not expected, but print something anyway */
+ else if (msg->id == SHAREDINVALSMGR_ID)
+ appendStringInfoString(buf, " smgr");
+ /* not expected, but print something anyway */
+ else if (msg->id == SHAREDINVALRELMAP_ID)
+ appendStringInfoString(buf, " relmap");
+ else if (msg->id == SHAREDINVALRELMAP_ID)
+ appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+ else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+ appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+ else
+ appendStringInfo(buf, " unknown id %d", msg->id);
+ }
+}
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
+#include "storage/standbydefs.h"
#include "utils/timestamp.h"
/*
}
if (parsed.nmsgs > 0)
{
- if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
- appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- parsed.dbId, parsed.tsId);
-
- appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < parsed.nmsgs; i++)
- {
- SharedInvalidationMessage *msg = &parsed.msgs[i];
-
- if (msg->id >= 0)
- appendStringInfo(buf, " catcache %d", msg->id);
- else if (msg->id == SHAREDINVALCATALOG_ID)
- appendStringInfo(buf, " catalog %u", msg->cat.catId);
- else if (msg->id == SHAREDINVALRELCACHE_ID)
- appendStringInfo(buf, " relcache %u", msg->rc.relId);
- /* not expected, but print something anyway */
- else if (msg->id == SHAREDINVALSMGR_ID)
- appendStringInfoString(buf, " smgr");
- /* not expected, but print something anyway */
- else if (msg->id == SHAREDINVALRELMAP_ID)
- appendStringInfoString(buf, " relmap");
- else if (msg->id == SHAREDINVALSNAPSHOT_ID)
- appendStringInfo(buf, " snapshot %u", msg->sn.relId);
- else
- appendStringInfo(buf, " unknown id %d", msg->id);
- }
+ standby_desc_invalidations(
+ buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+ XactCompletionRelcacheInitFileInval(parsed.xinfo));
}
if (XactCompletionForceSyncCommit(parsed.xinfo))
/* Can't have child XIDs either; AssignTransactionId enforces this */
Assert(nchildren == 0);
+ /*
+ * Transactions without an assigned xid can contain invalidation
+ * messages (e.g. explicit relcache invalidations or catcache
+ * invalidations for inplace updates); standbys need to process
+ * those. We can't emit a commit record without an xid, and we don't
+ * want to force assigning an xid, because that'd be problematic for
+ * e.g. vacuum. Hence we emit a bespoke record for the
+ * invalidations. We don't want to use that in case a commit record is
+ * emitted, so they happen synchronously with commits (besides not
+ * wanting to emit more WAL recoreds).
+ */
+ if (nmsgs != 0)
+ {
+ LogStandbyInvalidations(nmsgs, invalMessages,
+ RelcacheInitFileInval);
+ wrote_xlog = true; /* not strictly necessary */
+ }
+
/*
* If we didn't create XLOG entries, we're done here; otherwise we
* should trigger flushing those entries the same as a commit record
break;
case XLOG_STANDBY_LOCK:
break;
+ case XLOG_INVALIDATIONS:
+ {
+ xl_invalidations *invalidations =
+ (xl_invalidations *) XLogRecGetData(r);
+
+ ReorderBufferImmediateInvalidation(
+ ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+ }
+ break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
}
* catalog and we need to update the caches according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
- {
- bool use_subtxn = IsTransactionOrTransactionBlock();
-
- if (use_subtxn)
- BeginInternalSubTransaction("replay");
-
- /*
- * Force invalidations to happen outside of a valid transaction - that
- * way entries will just be marked as invalid without accessing the
- * catalog. That's advantageous because we don't need to setup the
- * full state necessary for catalog access.
- */
- if (use_subtxn)
- AbortCurrentTransaction();
-
- ReorderBufferExecuteInvalidations(rb, txn);
-
- if (use_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
- }
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
else
Assert(txn->ninvalidations == 0);
ReorderBufferCleanupTXN(rb, txn);
}
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations)
+{
+ bool use_subtxn = IsTransactionOrTransactionBlock();
+ int i;
+
+ if (use_subtxn)
+ BeginInternalSubTransaction("replay");
+
+ /*
+ * Force invalidations to happen outside of a valid transaction - that
+ * way entries will just be marked as invalid without accessing the
+ * catalog. That's advantageous because we don't need to setup the
+ * full state necessary for catalog access.
+ */
+ if (use_subtxn)
+ AbortCurrentTransaction();
+
+ for (i = 0; i < ninvalidations; i++)
+ LocalExecuteInvalidationMessage(&invalidations[i]);
+
+ if (use_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+}
/*
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
ProcArrayApplyRecoveryInfo(&running);
}
+ else if (info == XLOG_INVALIDATIONS)
+ {
+ xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
+
+ ProcessCommittedInvalidationMessages(xlrec->msgs,
+ xlrec->nmsgs,
+ xlrec->relcacheInitFileInval,
+ xlrec->dbId,
+ xlrec->tsId);
+ }
else
elog(PANIC, "standby_redo: unknown op code %u", info);
}
*/
(void) GetTopTransactionId();
}
+
+/*
+ * Emit WAL for invalidations. This currently is only used for commits without
+ * an xid but which contain invalidations.
+ */
+void
+LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInitFileInval)
+{
+ xl_invalidations xlrec;
+
+ /* prepare record */
+ memset(&xlrec, 0, sizeof(xlrec));
+ xlrec.dbId = MyDatabaseId;
+ xlrec.tsId = MyDatabaseTableSpace;
+ xlrec.relcacheInitFileInval = relcacheInitFileInval;
+ xlrec.nmsgs = nmsgs;
+
+ /* perform insertion */
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
+}
}
/*
- * ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
- * to process invalidation messages added to commit records.
+ * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
+ * standby_redo() to process invalidation messages. Currently that happens
+ * only at end-of-xact.
*
* Relcache init file invalidation requires processing both
* before and after we send the SI messages. See AtEOXact_Inval()
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
+void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
extern void LogAccessExclusiveLockPrepare(void);
extern XLogRecPtr LogStandbySnapshot(void);
+extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInitFileInval);
#endif /* STANDBY_H */
#include "access/xlogreader.h"
#include "lib/stringinfo.h"
#include "storage/lockdefs.h"
+#include "storage/sinval.h"
/* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
extern void standby_redo(XLogReaderState *record);
extern void standby_desc(StringInfo buf, XLogReaderState *record);
extern const char *standby_identify(uint8 info);
+extern void standby_desc_invalidations(StringInfo buf,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ Oid dbId, Oid tsId,
+ bool relcacheInitFileInval);
/*
* XLOG message types
*/
#define XLOG_STANDBY_LOCK 0x00
#define XLOG_RUNNING_XACTS 0x10
+#define XLOG_INVALIDATIONS 0x20
typedef struct xl_standby_locks
{
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
} xl_running_xacts;
+/*
+ * Invalidations for standby, currently only when transactions without an
+ * assigned xid commit.
+ */
+typedef struct xl_invalidations
+{
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+ bool relcacheInitFileInval; /* invalidate relcache init file */
+ int nmsgs; /* number of shared inval msgs */
+ SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_invalidations;
+
+#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
+
#endif /* STANDBYDEFS_H */