Fix assert failure at end of recovery, broken by XLogInsert scaling patch.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 17 Jul 2013 19:57:48 +0000 (22:57 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 17 Jul 2013 20:12:22 +0000 (23:12 +0300)
Initialization of the first XLOG buffer at end-of-recovery was broken for
the case that the last read WAL record ended at a page boundary. Instead of
trying to copy the last full xlog page to the buffer cache in that case,
just set shared state so that the next page is initialized when the first
WAL record after startup is inserted. (that's what we did in earlier
version, too)

To make the shared state required for that case less surprising, replace the
XLogCtl->curridx variable, which was the index of the latest initialized
buffer, with an XLogRecPtr of how far the buffers have been initialized.
That also allows us to get rid of the XLogRecEndPtrToBufIdx macro.

While we're at it, make a similar change for XLogCtl->Write.curridx, getting
rid of that variable and calculating the next buffer to write from
XLogCtl->LogwrtResult instead.

src/backend/access/transam/xlog.c

index c9e3a7af7b3b35b95d017f06e749739c715a8524..8d43660353f5f98d6b171b5f3b3895d37969f4a6 100644 (file)
@@ -457,15 +457,6 @@ typedef struct XLogCtlInsert
    XLogRecPtr  lastBackupStart;
 } XLogCtlInsert;
 
-/*
- * Shared state data for XLogWrite/XLogFlush.
- */
-typedef struct XLogCtlWrite
-{
-   int         curridx;        /* cache index of next block to write */
-   pg_time_t   lastSegSwitchTime;      /* time of last xlog segment switch */
-} XLogCtlWrite;
-
 /*
  * Total shared-memory state for XLOG.
  */
@@ -482,12 +473,12 @@ typedef struct XLogCtlData
    XLogSegNo   lastRemovedSegNo;       /* latest removed/recycled XLOG
                                         * segment */
 
-   /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck */
+   /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
    XLogRecPtr  unloggedLSN;
    slock_t     ulsn_lck;
 
-   /* Protected by WALWriteLock: */
-   XLogCtlWrite Write;
+   /* Time of last xlog segment switch. Protected by WALWriteLock. */
+   pg_time_t   lastSegSwitchTime;
 
    /*
     * Protected by info_lck and WALWriteLock (you must hold either lock to
@@ -496,15 +487,15 @@ typedef struct XLogCtlData
    XLogwrtResult LogwrtResult;
 
    /*
-    * Latest initialized block index in cache.
+    * Latest initialized page in the cache (last byte position + 1).
     *
-    * To change curridx and the identity of a buffer, you need to hold
-    * WALBufMappingLock.  To change the identity of a buffer that's still
+    * To change the identity of a buffer (and InitializedUpTo), you need to
+    * hold WALBufMappingLock.  To change the identity of a buffer that's still
     * dirty, the old page needs to be written out first, and for that you
     * need WALWriteLock, and you need to ensure that there are no in-progress
     * insertions to the page by calling WaitXLogInsertionsToFinish().
     */
-   int         curridx;
+   XLogRecPtr  InitializedUpTo;
 
    /*
     * These values do not change after startup, although the pointed-to pages
@@ -618,16 +609,10 @@ static ControlFileData *ControlFile = NULL;
 /*
  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
  * would hold if it was in cache, the page containing 'recptr'.
- *
- * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
- * page is taken to mean the previous page.
  */
 #define XLogRecPtrToBufIdx(recptr) \
    (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define XLogRecEndPtrToBufIdx(recptr)  \
-   ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
-
 /*
  * These are the number of bytes in a WAL page and segment usable for WAL data.
  */
@@ -2409,9 +2394,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
     * Now that we have the lock, check if someone initialized the page
     * already.
     */
-   while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic)
+   while (upto >= XLogCtl->InitializedUpTo || opportunistic)
    {
-       nextidx = NextBufIdx(XLogCtl->curridx);
+       nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
 
        /*
         * Get ending-offset of the buffer page we need to replace (this may
@@ -2484,11 +2469,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
         * Now the next buffer slot is free and we can set it up to be the next
         * output page.
         */
-       NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
+       NewPageBeginPtr = XLogCtl->InitializedUpTo;
        NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
 
-       Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
-       Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
        Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
        NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
@@ -2547,7 +2530,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
        *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
-       XLogCtl->curridx = nextidx;
+       XLogCtl->InitializedUpTo = NewPageEndPtr;
 
        npages++;
    }
@@ -2598,7 +2581,6 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
 static void
 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
-   XLogCtlWrite *Write = &XLogCtl->Write;
    bool        ispartialpage;
    bool        last_iteration;
    bool        finishing_seg;
@@ -2631,12 +2613,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
    /*
     * Within the loop, curridx is the cache block index of the page to
-    * consider writing.  We advance Write->curridx only after successfully
-    * writing pages.  (Right now, this refinement is useless since we are
-    * going to PANIC if any error occurs anyway; but someday it may come in
-    * useful.)
+    * consider writing.  Begin at the buffer containing the next unwritten
+    * page, or last partially written page.
     */
-   curridx = Write->curridx;
+   curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
 
    while (LogwrtResult.Write < WriteRqst.Write)
    {
@@ -2747,7 +2727,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
            /* Update state for write */
            openLogOff += nbytes;
-           Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
            npages = 0;
 
            /*
@@ -2775,7 +2754,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                if (XLogArchivingActive())
                    XLogArchiveNotifySeg(openLogSegNo);
 
-               Write->lastSegSwitchTime = (pg_time_t) time(NULL);
+               XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
                /*
                 * Request a checkpoint if we've consumed too much xlog since
@@ -2807,7 +2786,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
    }
 
    Assert(npages == 0);
-   Assert(curridx == Write->curridx);
 
    /*
     * If asked to flush, do so
@@ -6021,12 +5999,10 @@ StartupXLOG(void)
    XLogSegNo   endLogSegNo;
    TimeLineID  PrevTimeLineID;
    XLogRecord *record;
-   uint32      freespace;
    TransactionId oldestActiveXID;
    bool        backupEndRequired = false;
    bool        backupFromStandby = false;
    DBState     dbstate_at_startup;
-   int         firstIdx;
    XLogReaderState *xlogreader;
    XLogPageReadPrivate private;
    bool        fast_promoted = false;
@@ -7034,48 +7010,51 @@ StartupXLOG(void)
    openLogOff = 0;
    Insert = &XLogCtl->Insert;
    Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
-
-   firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
-   XLogCtl->curridx = firstIdx;
-
-   XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+   Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
    /*
     * Tricky point here: readBuf contains the *last* block that the LastRec
     * record spans, not the one it starts in.  The last block is indeed the
     * one we want to use.
     */
-   Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize);
-   memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ);
-   Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+   if (EndOfLog % XLOG_BLCKSZ != 0)
+   {
+       char       *page;
+       int         len;
+       int         firstIdx;
+       XLogRecPtr  pageBeginPtr;
 
-   LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+       pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
+       Assert(readOff == pageBeginPtr % XLogSegSize);
 
-   XLogCtl->LogwrtResult = LogwrtResult;
+       firstIdx = XLogRecPtrToBufIdx(EndOfLog);
 
-   XLogCtl->LogwrtRqst.Write = EndOfLog;
-   XLogCtl->LogwrtRqst.Flush = EndOfLog;
+       /* Copy the valid part of the last block, and zero the rest */
+       page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+       len = EndOfLog % XLOG_BLCKSZ;
+       memcpy(page, xlogreader->readBuf, len);
+       memset(page + len, 0, XLOG_BLCKSZ - len);
 
-   freespace = INSERT_FREESPACE(EndOfLog);
-   if (freespace > 0)
-   {
-       /* Make sure rest of page is zero */
-       MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
-       XLogCtl->Write.curridx = firstIdx;
+       XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
+       XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
    }
    else
    {
        /*
-        * Whenever LogwrtResult points to exactly the end of a page,
-        * Write.curridx must point to the *next* page (see XLogWrite()).
-        *
-        * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
-        * this is sufficient.  The first actual attempt to insert a log
-        * record will advance the insert state.
+        * There is no partial block to copy. Just set InitializedUpTo,
+        * and let the first attempt to insert a log record to initialize
+        * the next buffer.
         */
-       XLogCtl->Write.curridx = NextBufIdx(firstIdx);
+       XLogCtl->InitializedUpTo = EndOfLog;
    }
 
+   LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+
+   XLogCtl->LogwrtResult = LogwrtResult;
+
+   XLogCtl->LogwrtRqst.Write = EndOfLog;
+   XLogCtl->LogwrtRqst.Flush = EndOfLog;
+
    /* Pre-scan prepared transactions to find out the range of XIDs present */
    oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
 
@@ -7199,7 +7178,7 @@ StartupXLOG(void)
    LWLockRelease(ControlFileLock);
 
    /* start the archive_timeout timer running */
-   XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
+   XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
    /* also initialize latestCompletedXid, to nextXid - 1 */
    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -7710,7 +7689,7 @@ GetLastSegSwitchTime(void)
 
    /* Need WALWriteLock, but shared lock is sufficient */
    LWLockAcquire(WALWriteLock, LW_SHARED);
-   result = XLogCtl->Write.lastSegSwitchTime;
+   result = XLogCtl->lastSegSwitchTime;
    LWLockRelease(WALWriteLock);
 
    return result;