static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
static void BufferSync(int flags);
+static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
- int set_flag_bits);
+ uint32 set_flag_bits);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
static BufferDesc *BufferAlloc(SMgrRelation smgr,
if (isLocalBuf)
{
/* Only need to adjust flags */
- Assert(bufHdr->flags & BM_VALID);
- bufHdr->flags &= ~BM_VALID;
+ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+ Assert(buf_state & BM_VALID);
+ buf_state &= ~BM_VALID;
+ pg_atomic_write_u32(&bufHdr->state, buf_state);
}
else
{
*/
do
{
- LockBufHdr(bufHdr);
- Assert(bufHdr->flags & BM_VALID);
- bufHdr->flags &= ~BM_VALID;
- UnlockBufHdr(bufHdr);
+ uint32 buf_state = LockBufHdr(bufHdr);
+
+ Assert(buf_state & BM_VALID);
+ buf_state &= ~BM_VALID;
+ UnlockBufHdr(bufHdr, buf_state);
} while (!StartBufferIO(bufHdr, true));
}
}
* it's not been recycled) but come right back here to try smgrextend
* again.
*/
- Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
+ Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (isLocalBuf)
{
/* Only need to adjust flags */
- bufHdr->flags |= BM_VALID;
+ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+ buf_state |= BM_VALID;
+ pg_atomic_write_u32(&bufHdr->state, buf_state);
}
else
{
BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */
- BufFlags oldFlags;
+ uint32 oldFlags;
int buf_id;
BufferDesc *buf;
bool valid;
+ uint32 buf_state;
/* create a tag so we can lookup the buffer */
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
* Select a victim buffer. The buffer is returned with its header
* spinlock still held!
*/
- buf = StrategyGetBuffer(strategy);
+ buf = StrategyGetBuffer(strategy, &buf_state);
- Assert(buf->refcount == 0);
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
/* Must copy buffer flags while we still hold the spinlock */
- oldFlags = buf->flags;
+ oldFlags = buf_state & BUF_FLAG_MASK;
/* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf);
XLogRecPtr lsn;
/* Read the LSN while holding buffer header lock */
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
lsn = BufferGetLSN(buf);
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
if (XLogNeedsFlush(lsn) &&
StrategyRejectBuffer(strategy, buf))
/*
* Need to lock the buffer header too in order to change its tag.
*/
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
* recycle this buffer; we must undo everything we've done and start
* over with a new victim buffer.
*/
- oldFlags = buf->flags;
- if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
+ oldFlags = buf_state & BUF_FLAG_MASK;
+ if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
break;
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
BufTableDelete(&newTag, newHash);
if ((oldFlags & BM_TAG_VALID) &&
oldPartitionLock != newPartitionLock)
* 1 so that the buffer can survive one clock-sweep pass.)
*/
buf->tag = newTag;
- buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
+ buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
+ BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
+ BUF_USAGECOUNT_MASK);
if (relpersistence == RELPERSISTENCE_PERMANENT)
- buf->flags |= BM_TAG_VALID | BM_PERMANENT;
+ buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
else
- buf->flags |= BM_TAG_VALID;
- buf->usage_count = 1;
+ buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
if (oldFlags & BM_TAG_VALID)
{
BufferTag oldTag;
uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */
- BufFlags oldFlags;
+ uint32 oldFlags;
+ uint32 buf_state;
/* Save the original buffer tag before dropping the spinlock */
oldTag = buf->tag;
- UnlockBufHdr(buf);
+ buf_state = pg_atomic_read_u32(&buf->state);
+ Assert(buf_state & BM_LOCKED);
+ UnlockBufHdr(buf, buf_state);
/*
* Need to compute the old tag's hashcode and partition lock ID. XXX is it
LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
/* Re-lock the buffer header */
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
/* If it's changed while we were waiting for lock, do nothing */
if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
{
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
LWLockRelease(oldPartitionLock);
return;
}
* yet done StartBufferIO, WaitIO will fall through and we'll effectively
* be busy-looping here.)
*/
- if (buf->refcount != 0)
+ if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
LWLockRelease(oldPartitionLock);
/* safety check: should definitely not be our *own* pin */
if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
* Clear out the buffer's tag and flags. We must do this to ensure that
* linear scans of the buffer array don't think the buffer is valid.
*/
- oldFlags = buf->flags;
+ oldFlags = buf_state & BUF_FLAG_MASK;
CLEAR_BUFFERTAG(buf->tag);
- buf->flags = 0;
- buf->usage_count = 0;
-
- UnlockBufHdr(buf);
+ buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
+ UnlockBufHdr(buf, buf_state);
/*
* Remove the buffer from the lookup hashtable, if it was in there.
MarkBufferDirty(Buffer buffer)
{
BufferDesc *bufHdr;
+ uint32 buf_state;
+ uint32 old_buf_state;
if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer ID: %d", buffer);
/* unfortunately we can't check if the lock is held exclusively */
Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
- LockBufHdr(bufHdr);
+ old_buf_state = pg_atomic_read_u32(&bufHdr->state);
+ for (;;)
+ {
+ if (old_buf_state & BM_LOCKED)
+ old_buf_state = WaitBufHdrUnlocked(bufHdr);
- Assert(bufHdr->refcount > 0);
+ buf_state = old_buf_state;
+
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+ buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
+
+ if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
+ buf_state))
+ break;
+ }
/*
* If the buffer was not dirty already, do vacuum accounting.
*/
- if (!(bufHdr->flags & BM_DIRTY))
+ if (!(old_buf_state & BM_DIRTY))
{
VacuumPageDirty++;
pgBufferUsage.shared_blks_dirtied++;
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageDirty;
}
-
- bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
-
- UnlockBufHdr(bufHdr);
}
/*
*
* This should be applied only to shared buffers, never local ones.
*
+ * Since buffers are pinned/unpinned very frequently, pin buffers without
+ * taking the buffer header lock; instead update the state variable in loop of
+ * CAS operations. Hopefully it's just a single CAS.
+ *
* Note that ResourceOwnerEnlargeBuffers must have been done already.
*
* Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows
if (ref == NULL)
{
+ uint32 buf_state;
+ uint32 old_buf_state;
+
ReservePrivateRefCountEntry();
ref = NewPrivateRefCountEntry(b);
- LockBufHdr(buf);
- buf->refcount++;
- if (strategy == NULL)
- {
- if (buf->usage_count < BM_MAX_USAGE_COUNT)
- buf->usage_count++;
- }
- else
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
{
- if (buf->usage_count == 0)
- buf->usage_count = 1;
+ if (old_buf_state & BM_LOCKED)
+ old_buf_state = WaitBufHdrUnlocked(buf);
+
+ buf_state = old_buf_state;
+
+ /* increase refcount */
+ buf_state += BUF_REFCOUNT_ONE;
+
+ /* increase usagecount unless already max */
+ if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT)
+ buf_state += BUF_USAGECOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ buf_state))
+ {
+ result = (buf_state & BM_VALID) != 0;
+ break;
+ }
}
- result = (buf->flags & BM_VALID) != 0;
- UnlockBufHdr(buf);
}
else
{
{
Buffer b;
PrivateRefCountEntry *ref;
+ uint32 buf_state;
/*
* As explained, We don't expect any preexisting pins. That allows us to
*/
Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
- buf->refcount++;
- UnlockBufHdr(buf);
+ /*
+ * Since we hold the buffer spinlock, we can update the buffer state and
+ * release the lock in one operation.
+ */
+ buf_state = pg_atomic_read_u32(&buf->state);
+ Assert(buf_state & BM_LOCKED);
+ buf_state += BUF_REFCOUNT_ONE;
+ UnlockBufHdr(buf, buf_state);
b = BufferDescriptorGetBuffer(buf);
ref->refcount--;
if (ref->refcount == 0)
{
+ uint32 buf_state;
+ uint32 old_buf_state;
+
/* I'd better not still hold any locks on the buffer */
Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
- LockBufHdr(buf);
+ /*
+ * Decrement the shared reference count.
+ *
+ * Since buffer spinlock holder can update status using just write,
+ * it's not safe to use atomic decrement here; thus use a CAS loop.
+ */
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
+ {
+ if (old_buf_state & BM_LOCKED)
+ old_buf_state = WaitBufHdrUnlocked(buf);
+
+ buf_state = old_buf_state;
+
+ buf_state -= BUF_REFCOUNT_ONE;
- /* Decrement the shared reference count */
- Assert(buf->refcount > 0);
- buf->refcount--;
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ buf_state))
+ break;
+ }
/* Support LockBufferForCleanup() */
- if ((buf->flags & BM_PIN_COUNT_WAITER) &&
- buf->refcount == 1)
+ if (buf_state & BM_PIN_COUNT_WAITER)
{
- /* we just released the last pin other than the waiter's */
- int wait_backend_pid = buf->wait_backend_pid;
+ /*
+ * Acquire the buffer header lock, re-check that there's a waiter.
+ * Another backend could have unpinned this buffer, and already
+ * woken up the waiter. There's no danger of the buffer being
+ * replaced after we unpinned it above, as it's pinned by the
+ * waiter.
+ */
+ buf_state = LockBufHdr(buf);
- buf->flags &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(buf);
- ProcSendSignal(wait_backend_pid);
- }
- else
- UnlockBufHdr(buf);
+ if ((buf_state & BM_PIN_COUNT_WAITER) &&
+ BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+ {
+ /* we just released the last pin other than the waiter's */
+ int wait_backend_pid = buf->wait_backend_pid;
+ buf_state &= ~BM_PIN_COUNT_WAITER;
+ UnlockBufHdr(buf, buf_state);
+ ProcSendSignal(wait_backend_pid);
+ }
+ else
+ UnlockBufHdr(buf, buf_state);
+ }
ForgetPrivateRefCountEntry(ref);
}
}
static void
BufferSync(int flags)
{
+ uint32 buf_state;
int buf_id;
int num_to_scan;
int num_spaces;
* Header spinlock is enough to examine BM_DIRTY, see comment in
* SyncOneBuffer.
*/
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
- if ((bufHdr->flags & mask) == mask)
+ if ((buf_state & mask) == mask)
{
CkptSortItem *item;
- bufHdr->flags |= BM_CHECKPOINT_NEEDED;
+ buf_state |= BM_CHECKPOINT_NEEDED;
item = &CkptBufferIds[num_to_scan++];
item->buf_id = buf_id;
item->blockNum = bufHdr->tag.blockNum;
}
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
if (num_to_scan == 0)
* write the buffer though we didn't need to. It doesn't seem worth
* guarding against this, though.
*/
- if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
+ if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
{
if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
{
/* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{
- int buffer_state = SyncOneBuffer(next_to_clean, true,
- wb_context);
+ int sync_state = SyncOneBuffer(next_to_clean, true,
+ wb_context);
if (++next_to_clean >= NBuffers)
{
}
num_to_scan--;
- if (buffer_state & BUF_WRITTEN)
+ if (sync_state & BUF_WRITTEN)
{
reusable_buffers++;
if (++num_written >= bgwriter_lru_maxpages)
break;
}
}
- else if (buffer_state & BUF_REUSABLE)
+ else if (sync_state & BUF_REUSABLE)
reusable_buffers++;
}
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0;
+ uint32 buf_state;
BufferTag tag;
ReservePrivateRefCountEntry();
* don't worry because our checkpoint.redo points before log record for
* upcoming changes and so we are not required to write such dirty buffer.
*/
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
- if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
+ if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
+ BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
+ {
result |= BUF_REUSABLE;
+ }
else if (skip_recently_used)
{
/* Caller told us not to write recently-used buffers */
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
return result;
}
- if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
+ if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
{
/* It's clean, so nothing to do */
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
return result;
}
int32 loccount;
char *path;
BackendId backend;
+ uint32 buf_state;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
/* theoretically we should lock the bufhdr here */
path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
+ buf_state = pg_atomic_read_u32(&buf->state);
elog(WARNING,
"buffer refcount leak: [%03d] "
"(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
buffer, path,
- buf->tag.blockNum, buf->flags,
- buf->refcount, loccount);
+ buf->tag.blockNum, buf_state & BUF_FLAG_MASK,
+ BUF_STATE_GET_REFCOUNT(buf_state), loccount);
pfree(path);
}
io_time;
Block bufBlock;
char *bufToWrite;
+ uint32 buf_state;
/*
* Acquire the buffer's io_in_progress lock. If StartBufferIO returns
reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode);
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
/*
* Run PageGetLSN while holding header lock, since we don't have the
recptr = BufferGetLSN(buf);
/* To check if block content changes while flushing. - vadim 01/17/97 */
- buf->flags &= ~BM_JUST_DIRTIED;
- UnlockBufHdr(buf);
+ buf_state &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr(buf, buf_state);
/*
* Force XLOG flush up to buffer's LSN. This implements the basic WAL
* disastrous system-wide consequences. To make sure that can't happen,
* skip the flush if the buffer isn't permanent.
*/
- if (buf->flags & BM_PERMANENT)
+ if (buf_state & BM_PERMANENT)
XLogFlush(recptr);
/*
/*
* BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
* need not bother with the buffer header spinlock. Even if someone else
- * changes the buffer header flags while we're doing this, we assume that
- * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
- * old value or the new value, but not random garbage.
+ * changes the buffer header state while we're doing this, the state is
+ * changed atomically, so we'll read the old value or the new value, but
+ * not random garbage.
*/
bufHdr = GetBufferDescriptor(buffer - 1);
- return (bufHdr->flags & BM_PERMANENT) != 0;
+ return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
}
/*
BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
XLogRecPtr lsn;
+ uint32 buf_state;
/*
* If we don't need locking for correctness, fastpath out.
Assert(BufferIsValid(buffer));
Assert(BufferIsPinned(buffer));
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
lsn = PageGetLSN(page);
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
return lsn;
}
for (i = 0; i < NBuffers; i++)
{
BufferDesc *bufHdr = GetBufferDescriptor(i);
+ uint32 buf_state;
/*
* We can make this a tad faster by prechecking the buffer tag before
if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
continue;
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
bufHdr->tag.forkNum == forkNum &&
bufHdr->tag.blockNum >= firstDelBlock)
InvalidateBuffer(bufHdr); /* releases spinlock */
else
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
}
{
RelFileNode *rnode = NULL;
BufferDesc *bufHdr = GetBufferDescriptor(i);
+ uint32 buf_state;
/*
* As in DropRelFileNodeBuffers, an unlocked precheck should be safe
if (rnode == NULL)
continue;
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
InvalidateBuffer(bufHdr); /* releases spinlock */
else
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
pfree(nodes);
for (i = 0; i < NBuffers; i++)
{
BufferDesc *bufHdr = GetBufferDescriptor(i);
+ uint32 buf_state;
/*
* As in DropRelFileNodeBuffers, an unlocked precheck should be safe
if (bufHdr->tag.rnode.dbNode != dbid)
continue;
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid)
InvalidateBuffer(bufHdr); /* releases spinlock */
else
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
}
{
for (i = 0; i < NLocBuffer; i++)
{
+ uint32 buf_state;
+
bufHdr = GetLocalBufferDescriptor(i);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ ((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
+ (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
ErrorContextCallback errcallback;
Page localpage;
localpage,
false);
- bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ pg_atomic_write_u32(&bufHdr->state, buf_state);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
for (i = 0; i < NBuffers; i++)
{
+ uint32 buf_state;
+
bufHdr = GetBufferDescriptor(i);
/*
ReservePrivateRefCountEntry();
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
UnpinBuffer(bufHdr, true);
}
else
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
}
for (i = 0; i < NBuffers; i++)
{
+ uint32 buf_state;
+
bufHdr = GetBufferDescriptor(i);
/*
ReservePrivateRefCountEntry();
- LockBufHdr(bufHdr);
+ buf_state = LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
UnpinBuffer(bufHdr, true);
}
else
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
}
}
* is only intended to be used in cases where failing to write out the
* data would be harmless anyway, it doesn't really matter.
*/
- if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+ if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
XLogRecPtr lsn = InvalidXLogRecPtr;
bool dirtied = false;
bool delayChkpt = false;
+ uint32 buf_state;
/*
* If we need to protect hint bit updates from torn writes, WAL-log a
* We don't check full_page_writes here because that logic is included
* when we call XLogInsert() since the value changes dynamically.
*/
- if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
+ if (XLogHintBitIsNeeded() &&
+ (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
{
/*
* If we're in recovery we cannot dirty a page because of a hint.
lsn = XLogSaveBufferForHint(buffer, buffer_std);
}
- LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (!(bufHdr->flags & BM_DIRTY))
+ buf_state = LockBufHdr(bufHdr);
+
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+ if (!(buf_state & BM_DIRTY))
{
dirtied = true; /* Means "will be dirtied by this action" */
if (!XLogRecPtrIsInvalid(lsn))
PageSetLSN(page, lsn);
}
- bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
- UnlockBufHdr(bufHdr);
+
+ buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
+ UnlockBufHdr(bufHdr, buf_state);
if (delayChkpt)
MyPgXact->delayChkpt = false;
if (buf)
{
- LockBufHdr(buf);
+ uint32 buf_state;
+
+ buf_state = LockBufHdr(buf);
/*
* Don't complain if flag bit not set; it could have been reset but we
* got a cancel/die interrupt before getting the signal.
*/
- if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
+ if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_pid == MyProcPid)
- buf->flags &= ~BM_PIN_COUNT_WAITER;
+ buf_state &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
PinCountWaitBuf = NULL;
}
for (;;)
{
+ uint32 buf_state;
+
/* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (bufHdr->refcount == 1)
+ buf_state = LockBufHdr(bufHdr);
+
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+ if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
return;
}
/* Failed, so mark myself as waiting for pincount 1 */
- if (bufHdr->flags & BM_PIN_COUNT_WAITER)
+ if (buf_state & BM_PIN_COUNT_WAITER)
{
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "multiple backends attempting to wait for pincount 1");
}
bufHdr->wait_backend_pid = MyProcPid;
- bufHdr->flags |= BM_PIN_COUNT_WAITER;
PinCountWaitBuf = bufHdr;
- UnlockBufHdr(bufHdr);
+ buf_state |= BM_PIN_COUNT_WAITER;
+ UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Report the wait */
* impossible with the current usages due to table level locking, but
* better be safe.
*/
- LockBufHdr(bufHdr);
- if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 &&
+ buf_state = LockBufHdr(bufHdr);
+ if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
bufHdr->wait_backend_pid == MyProcPid)
- bufHdr->flags &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(bufHdr);
+ buf_state &= ~BM_PIN_COUNT_WAITER;
+ UnlockBufHdr(bufHdr, buf_state);
PinCountWaitBuf = NULL;
/* Loop back and try again */
ConditionalLockBufferForCleanup(Buffer buffer)
{
BufferDesc *bufHdr;
+ uint32 buf_state,
+ refcount;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
{
+ refcount = LocalRefCount[-buffer - 1];
/* There should be exactly one pin */
- Assert(LocalRefCount[-buffer - 1] > 0);
- if (LocalRefCount[-buffer - 1] != 1)
+ Assert(refcount > 0);
+ if (refcount != 1)
return false;
/* Nobody else to wait for */
return true;
}
/* There should be exactly one local pin */
- Assert(GetPrivateRefCount(buffer) > 0);
- if (GetPrivateRefCount(buffer) != 1)
+ refcount = GetPrivateRefCount(buffer);
+ Assert(refcount);
+ if (refcount != 1)
return false;
/* Try to acquire lock */
return false;
bufHdr = GetBufferDescriptor(buffer - 1);
- LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (bufHdr->refcount == 1)
+ buf_state = LockBufHdr(bufHdr);
+ refcount = BUF_STATE_GET_REFCOUNT(buf_state);
+
+ Assert(refcount > 0);
+ if (refcount == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
return true;
}
/* Failed, so release the lock */
- UnlockBufHdr(bufHdr);
+ UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
return false;
}
*/
for (;;)
{
- BufFlags sv_flags;
+ uint32 buf_state;
/*
* It may not be necessary to acquire the spinlock to check the flag
* here, but since this test is essential for correctness, we'd better
* play it safe.
*/
- LockBufHdr(buf);
- sv_flags = buf->flags;
- UnlockBufHdr(buf);
- if (!(sv_flags & BM_IO_IN_PROGRESS))
+ buf_state = LockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
+
+ if (!(buf_state & BM_IO_IN_PROGRESS))
break;
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
LWLockRelease(BufferDescriptorGetIOLock(buf));
static bool
StartBufferIO(BufferDesc *buf, bool forInput)
{
+ uint32 buf_state;
+
Assert(!InProgressBuf);
for (;;)
*/
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
- if (!(buf->flags & BM_IO_IN_PROGRESS))
+ if (!(buf_state & BM_IO_IN_PROGRESS))
break;
/*
* an error (see AbortBufferIO). If that's the case, we must wait for
* him to get unwedged.
*/
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
LWLockRelease(BufferDescriptorGetIOLock(buf));
WaitIO(buf);
}
/* Once we get here, there is definitely no I/O active on this buffer */
- if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
+ if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
/* someone else already did the I/O */
- UnlockBufHdr(buf);
+ UnlockBufHdr(buf, buf_state);
LWLockRelease(BufferDescriptorGetIOLock(buf));
return false;
}
- buf->flags |= BM_IO_IN_PROGRESS;
-
- UnlockBufHdr(buf);
+ buf_state |= BM_IO_IN_PROGRESS;
+ UnlockBufHdr(buf, buf_state);
InProgressBuf = buf;
IsForInput = forInput;
* be 0, or BM_VALID if we just finished reading in the page.
*/
static void
-TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
+TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
{
+ uint32 buf_state;
+
Assert(buf == InProgressBuf);
- LockBufHdr(buf);
+ buf_state = LockBufHdr(buf);
+
+ Assert(buf_state & BM_IO_IN_PROGRESS);
- Assert(buf->flags & BM_IO_IN_PROGRESS);
- buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
- if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
- buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
- buf->flags |= set_flag_bits;
+ buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
+ if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
+ buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
- UnlockBufHdr(buf);
+ buf_state |= set_flag_bits;
+ UnlockBufHdr(buf, buf_state);
InProgressBuf = NULL;
if (buf)
{
+ uint32 buf_state;
+
/*
* Since LWLockReleaseAll has already been called, we're not holding
* the buffer's io_in_progress_lock. We have to re-acquire it so that
*/
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
- LockBufHdr(buf);
- Assert(buf->flags & BM_IO_IN_PROGRESS);
+ buf_state = LockBufHdr(buf);
+ Assert(buf_state & BM_IO_IN_PROGRESS);
if (IsForInput)
{
- Assert(!(buf->flags & BM_DIRTY));
+ Assert(!(buf_state & BM_DIRTY));
+
/* We'd better not think buffer is valid yet */
- Assert(!(buf->flags & BM_VALID));
- UnlockBufHdr(buf);
+ Assert(!(buf_state & BM_VALID));
+ UnlockBufHdr(buf, buf_state);
}
else
{
- BufFlags sv_flags;
-
- sv_flags = buf->flags;
- Assert(sv_flags & BM_DIRTY);
- UnlockBufHdr(buf);
+ Assert(buf_state & BM_DIRTY);
+ UnlockBufHdr(buf, buf_state);
/* Issue notice if this is not the first failure... */
- if (sv_flags & BM_IO_ERROR)
+ if (buf_state & BM_IO_ERROR)
{
/* Buffer is pinned, so we can read tag without spinlock */
char *path;
return 0;
}
+/*
+ * Lock buffer header - set BM_LOCKED in buffer state.
+ */
+uint32
+LockBufHdr(BufferDesc *desc)
+{
+ SpinDelayStatus delayStatus = init_spin_delay(desc);
+ uint32 old_buf_state;
+
+ while (true)
+ {
+ /* set BM_LOCKED flag */
+ old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
+ /* if it wasn't set before we're OK */
+ if (!(old_buf_state & BM_LOCKED))
+ break;
+ perform_spin_delay(&delayStatus);
+ }
+ finish_spin_delay(&delayStatus);
+ return old_buf_state | BM_LOCKED;
+}
+
+/*
+ * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's
+ * state at that point.
+ *
+ * Obviously the buffer could be locked by the time the value is returned, so
+ * this is primarily useful in CAS style loops.
+ */
+static uint32
+WaitBufHdrUnlocked(BufferDesc *buf)
+{
+ SpinDelayStatus delayStatus = init_spin_delay(buf);
+ uint32 buf_state;
+
+ buf_state = pg_atomic_read_u32(&buf->state);
+
+ while (buf_state & BM_LOCKED)
+ {
+ perform_spin_delay(&delayStatus);
+ buf_state = pg_atomic_read_u32(&buf->state);
+ }
+
+ finish_spin_delay(&delayStatus);
+
+ return buf_state;
+}
+
/*
* BufferTag comparator.
*/
* s_lock.c
* Hardware-dependent implementation of spinlocks.
*
+ * When waiting for a contended spinlock we loop tightly for awhile, then
+ * delay using pg_usleep() and try again. Preferably, "awhile" should be a
+ * small multiple of the maximum time we expect a spinlock to be held. 100
+ * iterations seems about right as an initial guess. However, on a
+ * uniprocessor the loop is a waste of cycles, while in a multi-CPU scenario
+ * it's usually better to spin a bit longer than to call the kernel, so we try
+ * to adapt the spin loop count depending on whether we seem to be in a
+ * uniprocessor or multiprocessor.
+ *
+ * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd
+ * be wrong; there are platforms where that can result in a "stuck
+ * spinlock" failure. This has been seen particularly on Alphas; it seems
+ * that the first TAS after returning from kernel space will always fail
+ * on that hardware.
+ *
+ * Once we do decide to block, we use randomly increasing pg_usleep()
+ * delays. The first delay is 1 msec, then the delay randomly increases to
+ * about one second, after which we reset to 1 msec and start again. The
+ * idea here is that in the presence of heavy contention we need to
+ * increase the delay, else the spinlock holder may never get to run and
+ * release the lock. (Consider situation where spinlock holder has been
+ * nice'd down in priority by the scheduler --- it will not get scheduled
+ * until all would-be acquirers are sleeping, so if we always use a 1-msec
+ * sleep, there is a real possibility of starvation.) But we can't just
+ * clamp the delay to an upper bound, else it would take a long time to
+ * make a reasonable number of tries.
+ *
+ * We time out and declare error after NUM_DELAYS delays (thus, exactly
+ * that many tries). With the given settings, this will usually take 2 or
+ * so minutes. It seems better to fix the total number of tries (and thus
+ * the probability of unintended failure) than to fix the total time
+ * spent.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#include "storage/s_lock.h"
#include "storage/barrier.h"
+
+#define MIN_SPINS_PER_DELAY 10
+#define MAX_SPINS_PER_DELAY 1000
+#define NUM_DELAYS 1000
+#define MIN_DELAY_USEC 1000L
+#define MAX_DELAY_USEC 1000000L
+
+
slock_t dummy_spinlock;
static int spins_per_delay = DEFAULT_SPINS_PER_DELAY;
* s_lock_stuck() - complain about a stuck spinlock
*/
static void
-s_lock_stuck(volatile slock_t *lock, const char *file, int line)
+s_lock_stuck(void *p, const char *file, int line)
{
#if defined(S_LOCK_TEST)
fprintf(stderr,
"\nStuck spinlock (%p) detected at %s:%d.\n",
- lock, file, line);
+ p, file, line);
exit(1);
#else
elog(PANIC, "stuck spinlock (%p) detected at %s:%d",
- lock, file, line);
+ p, file, line);
#endif
}
-
/*
* s_lock(lock) - platform-independent portion of waiting for a spinlock.
*/
int
s_lock(volatile slock_t *lock, const char *file, int line)
{
- /*
- * We loop tightly for awhile, then delay using pg_usleep() and try again.
- * Preferably, "awhile" should be a small multiple of the maximum time we
- * expect a spinlock to be held. 100 iterations seems about right as an
- * initial guess. However, on a uniprocessor the loop is a waste of
- * cycles, while in a multi-CPU scenario it's usually better to spin a bit
- * longer than to call the kernel, so we try to adapt the spin loop count
- * depending on whether we seem to be in a uniprocessor or multiprocessor.
- *
- * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd
- * be wrong; there are platforms where that can result in a "stuck
- * spinlock" failure. This has been seen particularly on Alphas; it seems
- * that the first TAS after returning from kernel space will always fail
- * on that hardware.
- *
- * Once we do decide to block, we use randomly increasing pg_usleep()
- * delays. The first delay is 1 msec, then the delay randomly increases to
- * about one second, after which we reset to 1 msec and start again. The
- * idea here is that in the presence of heavy contention we need to
- * increase the delay, else the spinlock holder may never get to run and
- * release the lock. (Consider situation where spinlock holder has been
- * nice'd down in priority by the scheduler --- it will not get scheduled
- * until all would-be acquirers are sleeping, so if we always use a 1-msec
- * sleep, there is a real possibility of starvation.) But we can't just
- * clamp the delay to an upper bound, else it would take a long time to
- * make a reasonable number of tries.
- *
- * We time out and declare error after NUM_DELAYS delays (thus, exactly
- * that many tries). With the given settings, this will usually take 2 or
- * so minutes. It seems better to fix the total number of tries (and thus
- * the probability of unintended failure) than to fix the total time
- * spent.
- */
-#define MIN_SPINS_PER_DELAY 10
-#define MAX_SPINS_PER_DELAY 1000
-#define NUM_DELAYS 1000
-#define MIN_DELAY_USEC 1000L
-#define MAX_DELAY_USEC 1000000L
-
- int spins = 0;
- int delays = 0;
- int cur_delay = 0;
+ SpinDelayStatus delayStatus = init_spin_delay((void *) lock);
while (TAS_SPIN(lock))
{
- /* CPU-specific delay each time through the loop */
- SPIN_DELAY();
+ perform_spin_delay(&delayStatus);
+ }
- /* Block the process every spins_per_delay tries */
- if (++spins >= spins_per_delay)
- {
- if (++delays > NUM_DELAYS)
- s_lock_stuck(lock, file, line);
+ finish_spin_delay(&delayStatus);
- if (cur_delay == 0) /* first time to delay? */
- cur_delay = MIN_DELAY_USEC;
+ return delayStatus.delays;
+}
- pg_usleep(cur_delay);
+#ifdef USE_DEFAULT_S_UNLOCK
+void
+s_unlock(volatile slock_t *lock)
+{
+#ifdef TAS_ACTIVE_WORD
+ /* HP's PA-RISC */
+ *TAS_ACTIVE_WORD(lock) = -1;
+#else
+ *lock = 0;
+#endif
+}
+#endif
+
+/*
+ * Wait while spinning on a contended spinlock.
+ */
+void
+perform_spin_delay(SpinDelayStatus *status)
+{
+ /* CPU-specific delay each time through the loop */
+ SPIN_DELAY();
+
+ /* Block the process every spins_per_delay tries */
+ if (++(status->spins) >= spins_per_delay)
+ {
+ if (++(status->delays) > NUM_DELAYS)
+ s_lock_stuck(status->ptr, status->file, status->line);
+
+ if (status->cur_delay == 0) /* first time to delay? */
+ status->cur_delay = MIN_DELAY_USEC;
+
+ pg_usleep(status->cur_delay);
#if defined(S_LOCK_TEST)
- fprintf(stdout, "*");
- fflush(stdout);
+ fprintf(stdout, "*");
+ fflush(stdout);
#endif
- /* increase delay by a random fraction between 1X and 2X */
- cur_delay += (int) (cur_delay *
+ /* increase delay by a random fraction between 1X and 2X */
+ status->cur_delay += (int) (status->cur_delay *
((double) random() / (double) MAX_RANDOM_VALUE) + 0.5);
- /* wrap back to minimum delay when max is exceeded */
- if (cur_delay > MAX_DELAY_USEC)
- cur_delay = MIN_DELAY_USEC;
+ /* wrap back to minimum delay when max is exceeded */
+ if (status->cur_delay > MAX_DELAY_USEC)
+ status->cur_delay = MIN_DELAY_USEC;
- spins = 0;
- }
+ status->spins = 0;
}
+}
- /*
- * If we were able to acquire the lock without delaying, it's a good
- * indication we are in a multiprocessor. If we had to delay, it's a sign
- * (but not a sure thing) that we are in a uniprocessor. Hence, we
- * decrement spins_per_delay slowly when we had to delay, and increase it
- * rapidly when we didn't. It's expected that spins_per_delay will
- * converge to the minimum value on a uniprocessor and to the maximum
- * value on a multiprocessor.
- *
- * Note: spins_per_delay is local within our current process. We want to
- * average these observations across multiple backends, since it's
- * relatively rare for this function to even get entered, and so a single
- * backend might not live long enough to converge on a good value. That
- * is handled by the two routines below.
- */
- if (cur_delay == 0)
+/*
+ * After acquiring a spinlock, update estimates about how long to loop.
+ *
+ * If we were able to acquire the lock without delaying, it's a good
+ * indication we are in a multiprocessor. If we had to delay, it's a sign
+ * (but not a sure thing) that we are in a uniprocessor. Hence, we
+ * decrement spins_per_delay slowly when we had to delay, and increase it
+ * rapidly when we didn't. It's expected that spins_per_delay will
+ * converge to the minimum value on a uniprocessor and to the maximum
+ * value on a multiprocessor.
+ *
+ * Note: spins_per_delay is local within our current process. We want to
+ * average these observations across multiple backends, since it's
+ * relatively rare for this function to even get entered, and so a single
+ * backend might not live long enough to converge on a good value. That
+ * is handled by the two routines below.
+ */
+void
+finish_spin_delay(SpinDelayStatus *status)
+{
+ if (status->cur_delay == 0)
{
/* we never had to delay */
if (spins_per_delay < MAX_SPINS_PER_DELAY)
if (spins_per_delay > MIN_SPINS_PER_DELAY)
spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY);
}
- return delays;
}
-#ifdef USE_DEFAULT_S_UNLOCK
-void
-s_unlock(volatile slock_t *lock)
-{
-#ifdef TAS_ACTIVE_WORD
- /* HP's PA-RISC */
- *TAS_ACTIVE_WORD(lock) = -1;
-#else
- *lock = 0;
-#endif
-}
-#endif
-
/*
* Set local copy of spins_per_delay during backend startup.
*
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
+#include "port/atomics.h"
#include "storage/spin.h"
#include "utils/relcache.h"
+/*
+ * Buffer state is a single 32-bit variable where following data is combined.
+ *
+ * - 18 bits refcount
+ * - 4 bits usage count
+ * - 10 bits of flags
+ *
+ * Combining these values allows to perform some operations without locking
+ * the buffer header, by modifying them together with a CAS loop.
+ *
+ * The definition of buffer state components is below.
+ */
+#define BUF_REFCOUNT_ONE 1
+#define BUF_REFCOUNT_MASK ((1U << 18) - 1)
+#define BUF_USAGECOUNT_MASK 0x003C0000U
+#define BUF_USAGECOUNT_ONE (1U << 18)
+#define BUF_USAGECOUNT_SHIFT 18
+#define BUF_FLAG_MASK 0xFFC00000U
+
+/* Get refcount and usagecount from buffer state */
+#define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK)
+#define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT)
+
/*
* Flags for buffer descriptors
*
* Note: TAG_VALID essentially means that there is a buffer hashtable
* entry associated with the buffer's tag.
*/
-#define BM_DIRTY (1 << 0) /* data needs writing */
-#define BM_VALID (1 << 1) /* data is valid */
-#define BM_TAG_VALID (1 << 2) /* tag is assigned */
-#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */
-#define BM_IO_ERROR (1 << 4) /* previous I/O failed */
-#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */
-#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */
-#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */
-#define BM_PERMANENT (1 << 8) /* permanent relation (not
+#define BM_LOCKED (1U << 22) /* buffer header is locked */
+#define BM_DIRTY (1U << 23) /* data needs writing */
+#define BM_VALID (1U << 24) /* data is valid */
+#define BM_TAG_VALID (1U << 25) /* tag is assigned */
+#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */
+#define BM_IO_ERROR (1U << 27) /* previous I/O failed */
+#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */
+#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */
+#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */
+#define BM_PERMANENT (1U << 31) /* permanent relation (not
* unlogged) */
-
-typedef bits16 BufFlags;
-
/*
* The maximum allowed value of usage_count represents a tradeoff between
* accuracy and speed of the clock-sweep buffer management algorithm. A
/*
* BufferDesc -- shared descriptor/state data for a single shared buffer.
*
- * Note: buf_hdr_lock must be held to examine or change the tag, flags,
- * usage_count, refcount, or wait_backend_pid fields. buf_id field never
- * changes after initialization, so does not need locking. freeNext is
- * protected by the buffer_strategy_lock not buf_hdr_lock. The LWLock can
- * take care of itself. The buf_hdr_lock is *not* used to control access to
- * the data in the buffer!
+ * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change
+ * the tag, state or wait_backend_pid fields. In general, buffer header lock
+ * is a spinlock which is combined with flags, refcount and usagecount into
+ * single atomic variable. This layout allow us to do some operations in a
+ * single atomic operation, without actually acquiring and releasing spinlock;
+ * for instance, increase or decrease refcount. buf_id field never changes
+ * after initialization, so does not need locking. freeNext is protected by
+ * the buffer_strategy_lock not buffer header lock. The LWLock can take care
+ * of itself. The buffer header lock is *not* used to control access to the
+ * data in the buffer!
+ *
+ * It's assumed that nobody changes the state field while buffer header lock
+ * is held. Thus buffer header lock holder can do complex updates of the
+ * state variable in single write, simultaneously with lock release (cleaning
+ * BM_LOCKED flag). On the other hand, updating of state without holding
+ * buffer header lock is restricted to CAS, which insure that BM_LOCKED flag
+ * is not set. Atomic increment/decrement, OR/AND etc. are not allowed.
*
* An exception is that if we have the buffer pinned, its tag can't change
- * underneath us, so we can examine the tag without locking the spinlock.
+ * underneath us, so we can examine the tag without locking the buffer header.
* Also, in places we do one-time reads of the flags without bothering to
- * lock the spinlock; this is generally for situations where we don't expect
- * the flag bit being tested to be changing.
+ * lock the buffer header; this is generally for situations where we don't
+ * expect the flag bit being tested to be changing.
*
* We can't physically remove items from a disk page if another backend has
* the buffer pinned. Hence, a backend may need to wait for all other pins
typedef struct BufferDesc
{
BufferTag tag; /* ID of page contained in buffer */
- BufFlags flags; /* see bit definitions above */
- uint8 usage_count; /* usage counter for clock sweep code */
- slock_t buf_hdr_lock; /* protects a subset of fields, see above */
- unsigned refcount; /* # of backends holding pins on buffer */
- int wait_backend_pid; /* backend PID of pin-count waiter */
-
int buf_id; /* buffer's index number (from 0) */
+
+ /* state of the tag, containing flags, refcount and usagecount */
+ pg_atomic_uint32 state;
+
+ int wait_backend_pid; /* backend PID of pin-count waiter */
int freeNext; /* link in freelist chain */
LWLock content_lock; /* to lock access to buffer contents */
#define FREENEXT_NOT_IN_LIST (-2)
/*
- * Macros for acquiring/releasing a shared buffer header's spinlock.
- * Do not apply these to local buffers!
+ * Functions for acquiring/releasing a shared buffer header's spinlock. Do
+ * not apply these to local buffers!
*/
-#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
-#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock)
+extern uint32 LockBufHdr(BufferDesc *desc);
+#define UnlockBufHdr(desc, s) \
+ do { \
+ pg_atomic_write_u32(&(desc)->state, (s) & (~BM_LOCKED)); \
+ pg_write_barrier(); \
+ } while (0)
/*
extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag);
/* freelist.c */
-extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy);
+extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
+ uint32 *buf_state);
extern void StrategyFreeBuffer(BufferDesc *buf);
extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
BufferDesc *buf);