From 46846433a03dff4f2e08c8a161e54a842da360d6 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 14 Oct 2021 16:06:43 -0400 Subject: [PATCH] shm_mq: Update mq_bytes_written less often. Do not update shm_mq's mq_bytes_written until we have written an amount of data greater than 1/4th of the ring size, unless the caller of shm_mq_send(v) requests a flush at the end of the message. This reduces the number of calls to SetLatch(), and also the number of CPU cache misses, considerably, and thus makes shm_mq significantly faster. Dilip Kumar, reviewed by Zhihong Yu and Tomas Vondra. Some minor cosmetic changes by me. Discussion: http://postgr.es/m/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com --- src/backend/executor/tqueue.c | 2 +- src/backend/libpq/pqmq.c | 7 ++- src/backend/storage/ipc/shm_mq.c | 62 ++++++++++++++++++++++----- src/include/storage/shm_mq.h | 8 ++-- src/test/modules/test_shm_mq/test.c | 7 +-- src/test/modules/test_shm_mq/worker.c | 2 +- 6 files changed, 69 insertions(+), 19 deletions(-) diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 7af9fbe9848..eb0cbd7b217 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) /* Send the tuple itself. */ tuple = ExecFetchSlotMinimalTuple(slot, &should_free); - result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false); if (should_free) pfree(tuple); diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index d1a1f47a788..846494bf44c 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len) for (;;) { - result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + /* + * Immediately notify the receiver by passing force_flush as true so + * that the shared memory value is updated before we send the parallel + * message signal right after this. + */ + result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true); if (pq_mq_parallel_leader_pid != 0) SendProcSignal(pq_mq_parallel_leader_pid, diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 91a7093e033..b4ce9629d4c 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -109,6 +109,12 @@ struct shm_mq * locally by copying the chunks into a backend-local buffer. mqh_buffer is * the buffer, and mqh_buflen is the number of bytes allocated for it. * + * mqh_send_pending, is number of bytes that is written to the queue but not + * yet updated in the shared memory. We will not update it until the written + * data is 1/4th of the ring size or the tuple queue is full. This will + * prevent frequent CPU cache misses, and it will also avoid frequent + * SetLatch() calls, which are quite expensive. + * * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete * are used to track the state of non-blocking operations. When the caller * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they @@ -137,6 +143,7 @@ struct shm_mq_handle char *mqh_buffer; Size mqh_buflen; Size mqh_consume_pending; + Size mqh_send_pending; Size mqh_partial_bytes; Size mqh_expected_bytes; bool mqh_length_word_complete; @@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) mqh->mqh_buffer = NULL; mqh->mqh_buflen = 0; mqh->mqh_consume_pending = 0; + mqh->mqh_send_pending = 0; mqh->mqh_partial_bytes = 0; mqh->mqh_expected_bytes = 0; mqh->mqh_length_word_complete = false; @@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) * Write a message into a shared message queue. */ shm_mq_result -shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) +shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, + bool force_flush) { shm_mq_iovec iov; iov.data = data; iov.len = nbytes; - return shm_mq_sendv(mqh, &iov, 1, nowait); + return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush); } /* @@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) * arguments, each time the process latch is set. (Once begun, the sending * of a message cannot be aborted except by detaching from the queue; changing * the length or payload will corrupt the queue.) + * + * When force_flush = true, we immediately update the shm_mq's mq_bytes_written + * and notify the receiver (if it is already attached). Otherwise, we don't + * update it until we have written an amount of data greater than 1/4th of the + * ring size. */ shm_mq_result -shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) +shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, + bool force_flush) { shm_mq_result res; shm_mq *mq = mqh->mqh_queue; @@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) mqh->mqh_counterparty_attached = true; } - /* Notify receiver of the newly-written data, and return. */ - SetLatch(&receiver->procLatch); + /* + * If the caller has requested force flush or we have written more than 1/4 + * of the ring size, mark it as written in shared memory and notify the + * receiver. + */ + if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) + { + shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); + SetLatch(&receiver->procLatch); + mqh->mqh_send_pending = 0; + } + return SHM_MQ_SUCCESS; } @@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) void shm_mq_detach(shm_mq_handle *mqh) { + /* Before detaching, notify the receiver about any already-written data. */ + if (mqh->mqh_send_pending > 0) + { + shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending); + mqh->mqh_send_pending = 0; + } + /* Notify counterparty that we're outta here. */ shm_mq_detach_internal(mqh->mqh_queue); @@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, /* Compute number of ring buffer bytes used and available. */ rb = pg_atomic_read_u64(&mq->mq_bytes_read); - wb = pg_atomic_read_u64(&mq->mq_bytes_written); + wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending; Assert(wb >= rb); used = wb - rb; Assert(used <= ringsize); @@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, } else if (available == 0) { + /* Update the pending send bytes in the shared memory. */ + shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); + /* * Since mq->mqh_counterparty_attached is known to be true at this * point, mq_receiver has been set, and it can't change once set. @@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, Assert(mqh->mqh_counterparty_attached); SetLatch(&mq->mq_receiver->procLatch); + /* + * We have just updated the mqh_send_pending bytes in the shared + * memory so reset it. + */ + mqh->mqh_send_pending = 0; + /* Skip manipulation of our latch if nowait = true. */ if (nowait) { @@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, * MAXIMUM_ALIGNOF, and each read is as well. */ Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); - shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); /* - * For efficiency, we don't set the reader's latch here. We'll do - * that only when the buffer fills up or after writing an entire - * message. + * For efficiency, we don't update the bytes written in the shared + * memory and also don't set the reader's latch here. Refer to + * the comments atop the shm_mq_handle structure for more + * information. */ + mqh->mqh_send_pending += MAXALIGN(sendnow); } } diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index e693f3f7600..cb1c555656c 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); /* Send or receive messages. */ extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, - Size nbytes, const void *data, bool nowait); -extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, - shm_mq_iovec *iov, int iovcnt, bool nowait); + Size nbytes, const void *data, bool nowait, + bool force_flush); +extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, + int iovcnt, bool nowait, bool force_flush); extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait); +extern void shm_mq_flush(shm_mq_handle *mqh); /* Wait for our counterparty to attach to the queue. */ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c index 2d8d695f97a..be074f08a31 100644 --- a/src/test/modules/test_shm_mq/test.c +++ b/src/test/modules/test_shm_mq/test.c @@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS) test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); /* Send the initial message. */ - res = shm_mq_send(outqh, message_size, message_contents, false); + res = shm_mq_send(outqh, message_size, message_contents, false, true); if (res != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS) break; /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); + res = shm_mq_send(outqh, len, data, false, true); if (res != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS) */ if (send_count < loop_count) { - res = shm_mq_send(outqh, message_size, message_contents, true); + res = shm_mq_send(outqh, message_size, message_contents, true, + true); if (res == SHM_MQ_SUCCESS) { ++send_count; diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c index 2180776a669..9b037b98fe7 100644 --- a/src/test/modules/test_shm_mq/worker.c +++ b/src/test/modules/test_shm_mq/worker.c @@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) break; /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); + res = shm_mq_send(outqh, len, data, false, true); if (res != SHM_MQ_SUCCESS) break; } -- 2.30.2