]> granicus.if.org Git - postgresql/commitdiff
Emit invalidations to standby for transactions without xid.
authorAndres Freund <andres@anarazel.de>
Sun, 24 Apr 2016 02:18:00 +0000 (19:18 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 27 Apr 2016 03:21:54 +0000 (20:21 -0700)
So far, when a transaction with pending invalidations, but without an
assigned xid, committed, we simply ignored those invalidation
messages. That's problematic, because those are actually sent for a
reason.

Known symptoms of this include that existing sessions on a hot-standby
replica sometimes fail to notice new concurrently built indexes and
visibility map updates.

The solution is to WAL log such invalidations in transactions without an
xid. We considered to alternatively force-assign an xid, but that'd be
problematic for vacuum, which might be run in systems with few xids.

Important: This adds a new WAL record, but as the patch has to be
back-patched, we can't bump the WAL page magic. This means that standbys
have to be updated before primaries; otherwise
"PANIC: standby_redo: unknown op code 32" errors can be encountered.

XXX:

Reported-By: Васильев Дмитрий, Masahiko Sawada
Discussion:
    CAB-SwXY6oH=9twBkXJtgR4UC1NqT-vpYAtxCseME62ADwyK5OA@mail.gmail.com
    CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w@mail.gmail.com

src/backend/access/rmgrdesc/standbydesc.c
src/backend/access/rmgrdesc/xactdesc.c
src/backend/access/transam/xact.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/reorderbuffer.c
src/backend/storage/ipc/standby.c
src/backend/utils/cache/inval.c
src/include/replication/reorderbuffer.h
src/include/storage/standby.h
src/include/storage/standbydefs.h

index 4872cfb2d96a6641204cfca949b41009ff04e344..e6172ccdf73b7d97a922b6eef5e00b9cd7cdc405 100644 (file)
@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 
                standby_desc_running_xacts(buf, xlrec);
        }
+       else if (info == XLOG_INVALIDATIONS)
+       {
+               xl_invalidations *xlrec = (xl_invalidations *) rec;
+
+               standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+                                                                  xlrec->dbId, xlrec->tsId,
+                                                                  xlrec->relcacheInitFileInval);
+       }
 }
 
 const char *
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
                case XLOG_RUNNING_XACTS:
                        id = "RUNNING_XACTS";
                        break;
+               case XLOG_INVALIDATIONS:
+                       id = "INVALIDATIONS";
+                       break;
        }
 
        return id;
 }
+
+/*
+ * This routine is used by both standby_desc and xact_desc, because
+ * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
+ * it seems pointless to duplicate the code.
+ */
+void
+standby_desc_invalidations(StringInfo buf,
+                                                  int nmsgs, SharedInvalidationMessage *msgs,
+                                                  Oid dbId, Oid tsId,
+                                                  bool relcacheInitFileInval)
+{
+       int i;
+
+       if (relcacheInitFileInval)
+               appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+                                                dbId, tsId);
+
+       appendStringInfoString(buf, "; inval msgs:");
+       for (i = 0; i < nmsgs; i++)
+       {
+               SharedInvalidationMessage *msg = &msgs[i];
+
+               if (msg->id >= 0)
+                       appendStringInfo(buf, " catcache %d", msg->id);
+               else if (msg->id == SHAREDINVALCATALOG_ID)
+                       appendStringInfo(buf, " catalog %u", msg->cat.catId);
+               else if (msg->id == SHAREDINVALRELCACHE_ID)
+                       appendStringInfo(buf, " relcache %u", msg->rc.relId);
+               /* not expected, but print something anyway */
+               else if (msg->id == SHAREDINVALSMGR_ID)
+                       appendStringInfoString(buf, " smgr");
+               /* not expected, but print something anyway */
+               else if (msg->id == SHAREDINVALRELMAP_ID)
+                       appendStringInfoString(buf, " relmap");
+               else if (msg->id == SHAREDINVALRELMAP_ID)
+                       appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+               else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+                       appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+               else
+                       appendStringInfo(buf, " unknown id %d", msg->id);
+       }
+}
index e8a334c17dbd083fd0ce4f6b1f8ea8225d99600e..6f07c5cfaacd19fea40040d0ceb221bb852c1e4f 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "storage/sinval.h"
+#include "storage/standbydefs.h"
 #include "utils/timestamp.h"
 
 /*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
        }
        if (parsed.nmsgs > 0)
        {
-               if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
-                       appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
-                                                        parsed.dbId, parsed.tsId);
-
-               appendStringInfoString(buf, "; inval msgs:");
-               for (i = 0; i < parsed.nmsgs; i++)
-               {
-                       SharedInvalidationMessage *msg = &parsed.msgs[i];
-
-                       if (msg->id >= 0)
-                               appendStringInfo(buf, " catcache %d", msg->id);
-                       else if (msg->id == SHAREDINVALCATALOG_ID)
-                               appendStringInfo(buf, " catalog %u", msg->cat.catId);
-                       else if (msg->id == SHAREDINVALRELCACHE_ID)
-                               appendStringInfo(buf, " relcache %u", msg->rc.relId);
-                       /* not expected, but print something anyway */
-                       else if (msg->id == SHAREDINVALSMGR_ID)
-                               appendStringInfoString(buf, " smgr");
-                       /* not expected, but print something anyway */
-                       else if (msg->id == SHAREDINVALRELMAP_ID)
-                               appendStringInfoString(buf, " relmap");
-                       else if (msg->id == SHAREDINVALSNAPSHOT_ID)
-                               appendStringInfo(buf, " snapshot %u", msg->sn.relId);
-                       else
-                               appendStringInfo(buf, " unknown id %d", msg->id);
-               }
+               standby_desc_invalidations(
+                       buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+                       XactCompletionRelcacheInitFileInval(parsed.xinfo));
        }
 
        if (XactCompletionForceSyncCommit(parsed.xinfo))
index 7e3733161390f1b01b8958b8787a21940c669664..95690ff36cb3477839a90260918b8ee3e9d49d64 100644 (file)
@@ -1163,6 +1163,24 @@ RecordTransactionCommit(void)
                /* Can't have child XIDs either; AssignTransactionId enforces this */
                Assert(nchildren == 0);
 
+               /*
+                * Transactions without an assigned xid can contain invalidation
+                * messages (e.g. explicit relcache invalidations or catcache
+                * invalidations for inplace updates); standbys need to process
+                * those. We can't emit a commit record without an xid, and we don't
+                * want to force assigning an xid, because that'd be problematic for
+                * e.g. vacuum.  Hence we emit a bespoke record for the
+                * invalidations. We don't want to use that in case a commit record is
+                * emitted, so they happen synchronously with commits (besides not
+                * wanting to emit more WAL recoreds).
+                */
+               if (nmsgs != 0)
+               {
+                       LogStandbyInvalidations(nmsgs, invalMessages,
+                                                                       RelcacheInitFileInval);
+                       wrote_xlog = true; /* not strictly necessary */
+               }
+
                /*
                 * If we didn't create XLOG entries, we're done here; otherwise we
                 * should trigger flushing those entries the same as a commit record
index 0cdb0b8a92b73945ff19cdb68dea0c0d01e3b433..0c248f07e8f677200e2ff4744e8a8bdc9b3b9717 100644 (file)
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                        break;
                case XLOG_STANDBY_LOCK:
                        break;
+               case XLOG_INVALIDATIONS:
+                       {
+                               xl_invalidations *invalidations =
+                                       (xl_invalidations *) XLogRecGetData(r);
+
+                               ReorderBufferImmediateInvalidation(
+                                       ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+                       }
+                       break;
                default:
                        elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
        }
index 45207086ac0dafae9a0f9af0e46344e9e44842fd..57821c34027e442137d33258bdf49fbf828cea51 100644 (file)
@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
         * catalog and we need to update the caches according to that.
         */
        if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
-       {
-               bool            use_subtxn = IsTransactionOrTransactionBlock();
-
-               if (use_subtxn)
-                       BeginInternalSubTransaction("replay");
-
-               /*
-                * Force invalidations to happen outside of a valid transaction - that
-                * way entries will just be marked as invalid without accessing the
-                * catalog. That's advantageous because we don't need to setup the
-                * full state necessary for catalog access.
-                */
-               if (use_subtxn)
-                       AbortCurrentTransaction();
-
-               ReorderBufferExecuteInvalidations(rb, txn);
-
-               if (use_subtxn)
-                       RollbackAndReleaseCurrentSubTransaction();
-       }
+               ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+                                                                                  txn->invalidations);
        else
                Assert(txn->ninvalidations == 0);
 
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
        ReorderBufferCleanupTXN(rb, txn);
 }
 
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+                                                                  SharedInvalidationMessage *invalidations)
+{
+       bool            use_subtxn = IsTransactionOrTransactionBlock();
+       int                     i;
+
+       if (use_subtxn)
+               BeginInternalSubTransaction("replay");
+
+       /*
+        * Force invalidations to happen outside of a valid transaction - that
+        * way entries will just be marked as invalid without accessing the
+        * catalog. That's advantageous because we don't need to setup the
+        * full state necessary for catalog access.
+        */
+       if (use_subtxn)
+               AbortCurrentTransaction();
+
+       for (i = 0; i < ninvalidations; i++)
+               LocalExecuteInvalidationMessage(&invalidations[i]);
+
+       if (use_subtxn)
+               RollbackAndReleaseCurrentSubTransaction();
+}
 
 /*
  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
index 6a9bf842d3923affc66e251dcd277afcab1ec7e9..762dfa65eb967bd6eb2e88015911cd1e27cf8a65 100644 (file)
@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
 
                ProcArrayApplyRecoveryInfo(&running);
        }
+       else if (info == XLOG_INVALIDATIONS)
+       {
+               xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
+
+               ProcessCommittedInvalidationMessages(xlrec->msgs,
+                                                                                        xlrec->nmsgs,
+                                                                                        xlrec->relcacheInitFileInval,
+                                                                                        xlrec->dbId,
+                                                                                        xlrec->tsId);
+       }
        else
                elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
         */
        (void) GetTopTransactionId();
 }
+
+/*
+ * Emit WAL for invalidations. This currently is only used for commits without
+ * an xid but which contain invalidations.
+ */
+void
+LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+                                               bool relcacheInitFileInval)
+{
+       xl_invalidations xlrec;
+
+       /* prepare record */
+       memset(&xlrec, 0, sizeof(xlrec));
+       xlrec.dbId = MyDatabaseId;
+       xlrec.tsId = MyDatabaseTableSpace;
+       xlrec.relcacheInitFileInval = relcacheInitFileInval;
+       xlrec.nmsgs = nmsgs;
+
+       /* perform insertion */
+       XLogBeginInsert();
+       XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
+       XLogRegisterData((char *) msgs,
+                                        nmsgs * sizeof(SharedInvalidationMessage));
+       XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
+}
index 924bebbac52554215e6ef798bd9afff12d3146ff..58035182298fd40510347584b9ba229f080ea049 100644 (file)
@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
 }
 
 /*
- * ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
- * to process invalidation messages added to commit records.
+ * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
+ * standby_redo() to process invalidation messages. Currently that happens
+ * only at end-of-xact.
  *
  * Relcache init file invalidation requires processing both
  * before and after we send the SI messages. See AtEOXact_Inval()
index 4c54953a5127c52ed1f48e13943d37b063fc6a9a..e0708940a04e11246edb15145ec1861570e91cba 100644 (file)
@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
                                                 CommandId cmin, CommandId cmax, CommandId combocid);
 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
                                                          Size nmsgs, SharedInvalidationMessage *msgs);
+void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+                                                                               SharedInvalidationMessage *invalidations);
 void           ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 void           ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 bool           ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
index aafc9b8a4828f6841350438dc217a91e1644fa75..52058840a591cd7947c01f4f5b9bcfc97b19efa5 100644 (file)
@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
 extern XLogRecPtr LogStandbySnapshot(void);
+extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+                                                                       bool relcacheInitFileInval);
 
 #endif   /* STANDBY_H */
index 609d06edeeb5418b81f76b29f41a08980e4d3057..bd3c97fe434325040af2ebf2dfde91ab2011f2f3 100644 (file)
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "storage/lockdefs.h"
+#include "storage/sinval.h"
 
 /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
 extern void standby_redo(XLogReaderState *record);
 extern void standby_desc(StringInfo buf, XLogReaderState *record);
 extern const char *standby_identify(uint8 info);
+extern void standby_desc_invalidations(StringInfo buf,
+                                                                          int nmsgs, SharedInvalidationMessage *msgs,
+                                                                          Oid dbId, Oid tsId,
+                                                                          bool relcacheInitFileInval);
 
 /*
  * XLOG message types
  */
 #define XLOG_STANDBY_LOCK                      0x00
 #define XLOG_RUNNING_XACTS                     0x10
+#define XLOG_INVALIDATIONS                     0x20
 
 typedef struct xl_standby_locks
 {
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
        TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
 
+/*
+ * Invalidations for standby, currently only when transactions without an
+ * assigned xid commit.
+ */
+typedef struct xl_invalidations
+{
+       Oid                     dbId;                   /* MyDatabaseId */
+       Oid                     tsId;                   /* MyDatabaseTableSpace */
+       bool            relcacheInitFileInval;  /* invalidate relcache init file */
+       int                     nmsgs;                  /* number of shared inval msgs */
+       SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_invalidations;
+
+#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
+
 #endif   /* STANDBYDEFS_H */