]> granicus.if.org Git - postgresql/commitdiff
Refactor XLogInsert a bit. The rdata entries for backup blocks are now
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 11 Jan 2012 07:46:18 +0000 (09:46 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 11 Jan 2012 09:01:47 +0000 (11:01 +0200)
constructed before acquiring WALInsertLock, which slightly reduces the time
the lock is held. Although I could not measure any benefit in benchmarks,
the code is more readable this way.

src/backend/access/transam/xlog.c

index 8e659620faba41aca7443e99fb6ad33dfb20017c..db7d9930cb6517d03eee9ff277a63e0e191a5a99 100644 (file)
@@ -694,6 +694,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          freespace;
        int                     curridx;
        XLogRecData *rdt;
+       XLogRecData *rdt_lastnormal;
        Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
        bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
        BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
@@ -708,6 +709,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+       uint8           info_orig = info;
 
        /* cross-check on whether we should be here or not */
        if (!XLogInsertAllowed())
@@ -731,23 +733,18 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        }
 
        /*
-        * Here we scan the rdata chain, determine which buffers must be backed
-        * up, and compute the CRC values for the data.  Note that the record
-        * header isn't added into the CRC initially since we don't know the final
-        * length or info bits quite yet.  Thus, the CRC will represent the CRC of
-        * the whole record in the order "rdata, then backup blocks, then record
-        * header".
+        * Here we scan the rdata chain, to determine which buffers must be backed
+        * up.
         *
         * We may have to loop back to here if a race condition is detected below.
         * We could prevent the race by doing all this work while holding the
         * insert lock, but it seems better to avoid doing CRC calculations while
-        * holding the lock.  This means we have to be careful about modifying the
-        * rdata chain until we know we aren't going to loop back again.  The only
-        * change we allow ourselves to make earlier is to set rdt->data = NULL in
-        * chain items we have decided we will have to back up the whole buffer
-        * for.  This is OK because we will certainly decide the same thing again
-        * for those items if we do it over; doing it here saves an extra pass
-        * over the chain later.
+        * holding the lock.
+        *
+        * We add entries for backup blocks to the chain, so that they don't
+        * need any special treatment in the critical section where the chunks are
+        * copied into the WAL buffers. Those entries have to be unlinked from the
+        * chain if we have to loop back here.
         */
 begin:;
        for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
@@ -764,7 +761,6 @@ begin:;
         */
        doPageWrites = fullPageWrites || Insert->forcePageWrites;
 
-       INIT_CRC32(rdata_crc);
        len = 0;
        for (rdt = rdata;;)
        {
@@ -772,7 +768,6 @@ begin:;
                {
                        /* Simple data, just include it */
                        len += rdt->len;
-                       COMP_CRC32(rdata_crc, rdt->data, rdt->len);
                }
                else
                {
@@ -783,12 +778,12 @@ begin:;
                                {
                                        /* Buffer already referenced by earlier chain item */
                                        if (dtbuf_bkp[i])
+                                       {
                                                rdt->data = NULL;
+                                               rdt->len = 0;
+                                       }
                                        else if (rdt->data)
-                                       {
                                                len += rdt->len;
-                                               COMP_CRC32(rdata_crc, rdt->data, rdt->len);
-                                       }
                                        break;
                                }
                                if (dtbuf[i] == InvalidBuffer)
@@ -800,12 +795,10 @@ begin:;
                                        {
                                                dtbuf_bkp[i] = true;
                                                rdt->data = NULL;
+                                               rdt->len = 0;
                                        }
                                        else if (rdt->data)
-                                       {
                                                len += rdt->len;
-                                               COMP_CRC32(rdata_crc, rdt->data, rdt->len);
-                                       }
                                        break;
                                }
                        }
@@ -819,39 +812,6 @@ begin:;
                rdt = rdt->next;
        }
 
-       /*
-        * Now add the backup block headers and data into the CRC
-        */
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               if (dtbuf_bkp[i])
-               {
-                       BkpBlock   *bkpb = &(dtbuf_xlg[i]);
-                       char       *page;
-
-                       COMP_CRC32(rdata_crc,
-                                          (char *) bkpb,
-                                          sizeof(BkpBlock));
-                       page = (char *) BufferGetBlock(dtbuf[i]);
-                       if (bkpb->hole_length == 0)
-                       {
-                               COMP_CRC32(rdata_crc,
-                                                  page,
-                                                  BLCKSZ);
-                       }
-                       else
-                       {
-                               /* must skip the hole */
-                               COMP_CRC32(rdata_crc,
-                                                  page,
-                                                  bkpb->hole_offset);
-                               COMP_CRC32(rdata_crc,
-                                                  page + (bkpb->hole_offset + bkpb->hole_length),
-                                                  BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
-                       }
-               }
-       }
-
        /*
         * NOTE: We disallow len == 0 because it provides a useful bit of extra
         * error checking in ReadRecord.  This means that all callers of
@@ -862,70 +822,20 @@ begin:;
        if (len == 0 && !isLogSwitch)
                elog(PANIC, "invalid xlog record length %u", len);
 
-       START_CRIT_SECTION();
-
-       /* Now wait to get insert lock */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-
-       /*
-        * Check to see if my RedoRecPtr is out of date.  If so, may have to go
-        * back and recompute everything.  This can only happen just after a
-        * checkpoint, so it's better to be slow in this case and fast otherwise.
-        *
-        * If we aren't doing full-page writes then RedoRecPtr doesn't actually
-        * affect the contents of the XLOG record, so we'll update our local copy
-        * but not force a recomputation.
-        */
-       if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
-       {
-               Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
-               RedoRecPtr = Insert->RedoRecPtr;
-
-               if (doPageWrites)
-               {
-                       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-                       {
-                               if (dtbuf[i] == InvalidBuffer)
-                                       continue;
-                               if (dtbuf_bkp[i] == false &&
-                                       XLByteLE(dtbuf_lsn[i], RedoRecPtr))
-                               {
-                                       /*
-                                        * Oops, this buffer now needs to be backed up, but we
-                                        * didn't think so above.  Start over.
-                                        */
-                                       LWLockRelease(WALInsertLock);
-                                       END_CRIT_SECTION();
-                                       goto begin;
-                               }
-                       }
-               }
-       }
-
-       /*
-        * Also check to see if forcePageWrites was just turned on; if we weren't
-        * already doing full-page writes then go back and recompute. (If it was
-        * just turned off, we could recompute the record without full pages, but
-        * we choose not to bother.)
-        */
-       if (Insert->forcePageWrites && !doPageWrites)
-       {
-               /* Oops, must redo it with full-page data */
-               LWLockRelease(WALInsertLock);
-               END_CRIT_SECTION();
-               goto begin;
-       }
-
        /*
         * Make additional rdata chain entries for the backup blocks, so that we
-        * don't need to special-case them in the write loop.  Note that we have
-        * now irrevocably changed the input rdata chain.  At the exit of this
-        * loop, write_len includes the backup block data.
+        * don't need to special-case them in the write loop.  This modifies the
+        * original rdata chain, but we keep a pointer to the last regular entry,
+        * rdt_lastnormal, so that we can undo this if we have to loop back to the
+        * beginning.
+        *
+        * At the exit of this loop, write_len includes the backup block data.
         *
         * Also set the appropriate info bits to show which buffers were backed
         * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
         * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
         */
+       rdt_lastnormal = rdt;
        write_len = len;
        for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
        {
@@ -974,6 +884,76 @@ begin:;
                }
        }
 
+       /*
+        * Calculate CRC of the data, including all the backup blocks
+        *
+        * Note that the record header isn't added into the CRC initially since
+        * we don't know the prev-link yet.  Thus, the CRC will represent the CRC
+        * of the whole record in the order: rdata, then backup blocks, then
+        * record header.
+        */
+       INIT_CRC32(rdata_crc);
+       for (rdt = rdata; rdt != NULL; rdt = rdt->next)
+               COMP_CRC32(rdata_crc, rdt->data, rdt->len);
+
+       START_CRIT_SECTION();
+
+       /* Now wait to get insert lock */
+       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+
+       /*
+        * Check to see if my RedoRecPtr is out of date.  If so, may have to go
+        * back and recompute everything.  This can only happen just after a
+        * checkpoint, so it's better to be slow in this case and fast otherwise.
+        *
+        * If we aren't doing full-page writes then RedoRecPtr doesn't actually
+        * affect the contents of the XLOG record, so we'll update our local copy
+        * but not force a recomputation.
+        */
+       if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+       {
+               Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+               RedoRecPtr = Insert->RedoRecPtr;
+
+               if (doPageWrites)
+               {
+                       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+                       {
+                               if (dtbuf[i] == InvalidBuffer)
+                                       continue;
+                               if (dtbuf_bkp[i] == false &&
+                                       XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+                               {
+                                       /*
+                                        * Oops, this buffer now needs to be backed up, but we
+                                        * didn't think so above.  Start over.
+                                        */
+                                       LWLockRelease(WALInsertLock);
+                                       END_CRIT_SECTION();
+                                       rdt_lastnormal->next = NULL;
+                                       info = info_orig;
+                                       goto begin;
+                               }
+                       }
+               }
+       }
+
+       /*
+        * Also check to see if forcePageWrites was just turned on; if we weren't
+        * already doing full-page writes then go back and recompute. (If it was
+        * just turned off, we could recompute the record without full pages, but
+        * we choose not to bother.)
+        */
+       if (Insert->forcePageWrites && !doPageWrites)
+       {
+               /* Oops, must redo it with full-page data. */
+               LWLockRelease(WALInsertLock);
+               END_CRIT_SECTION();
+               rdt_lastnormal->next = NULL;
+               info = info_orig;
+               goto begin;
+       }
+
        /*
         * If there isn't enough space on the current XLOG page for a record
         * header, advance to the next page (leaving the unused space as zeroes).