PGPROC *mq_sender;
uint64 mq_bytes_read;
uint64 mq_bytes_written;
- uint64 mq_ring_size;
+ Size mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
char mq_ring[FLEXIBLE_ARRAY_MEMBER];
* locally by copying the chunks into a backend-local buffer. mqh_buffer is
* the buffer, and mqh_buflen is the number of bytes allocated for it.
*
- * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
+ * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
* are used to track the state of non-blocking operations. When the caller
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
* are expected to retry the call at a later time with the same argument;
* we need to retain enough state to pick up where we left off.
- * mqh_did_length_word tracks whether we read or wrote the length word,
- * mqh_partial_message_bytes tracks the number of payload bytes read or
- * written, and mqh_expected_bytes - which is used only for reads - tracks
- * the expected total size of the payload.
+ * mqh_length_word_complete tracks whether we are done sending or receiving
+ * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
+ * the number of bytes read or written for either the length word or the
+ * message itself, and mqh_expected_bytes - which is used only for reads -
+ * tracks the expected total size of the payload.
*
* mqh_counterparty_attached tracks whether we know the counterparty to have
* attached to the queue at some previous point. This lets us avoid some
dsm_segment *mqh_segment;
BackgroundWorkerHandle *mqh_handle;
char *mqh_buffer;
- uint64 mqh_buflen;
- uint64 mqh_consume_pending;
- uint64 mqh_partial_message_bytes;
- uint64 mqh_expected_bytes;
- bool mqh_did_length_word;
+ Size mqh_buflen;
+ Size mqh_consume_pending;
+ Size mqh_partial_bytes;
+ Size mqh_expected_bytes;
+ bool mqh_length_word_complete;
bool mqh_counterparty_attached;
MemoryContext mqh_context;
};
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
- void *data, bool nowait, uint64 *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
- bool nowait, uint64 *nbytesp, void **datap);
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+ void *data, bool nowait, Size *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
+ bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
BackgroundWorkerHandle *handle);
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
shm_mq_create(void *address, Size size)
{
shm_mq *mq = address;
- uint64 data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+ Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
size = MAXALIGN_DOWN(size);
mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0;
mqh->mqh_context = CurrentMemoryContext;
- mqh->mqh_partial_message_bytes = 0;
- mqh->mqh_did_length_word = false;
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
mqh->mqh_counterparty_attached = false;
if (seg != NULL)
* the length or payload will corrupt the queue.)
*/
shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
{
shm_mq_result res;
shm_mq *mq = mqh->mqh_queue;
- uint64 bytes_written;
+ Size bytes_written;
Assert(mq->mq_sender == MyProc);
- /* Write the message length into the buffer. */
- if (!mqh->mqh_did_length_word)
+ /* Try to write, or finish writing, the length word into the buffer. */
+ while (!mqh->mqh_length_word_complete)
{
- res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
- &bytes_written);
+ Assert(mqh->mqh_partial_bytes < sizeof(Size));
+ res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+ ((char *) &nbytes) + mqh->mqh_partial_bytes,
+ nowait, &bytes_written);
+ mqh->mqh_partial_bytes += bytes_written;
if (res != SHM_MQ_SUCCESS)
return res;
- /*
- * We're sure to have sent the length in full, since we always
- * write a MAXALIGN'd chunk.
- */
- Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
- mqh->mqh_did_length_word = true;
+ if (mqh->mqh_partial_bytes >= sizeof(Size))
+ {
+ Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = true;
+ }
+
+ /* Length word can't be split unless bigger than required alignment. */
+ Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
}
/* Write the actual data bytes into the buffer. */
- Assert(mqh->mqh_partial_message_bytes <= nbytes);
- res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
- ((char *) data) + mqh->mqh_partial_message_bytes,
+ Assert(mqh->mqh_partial_bytes <= nbytes);
+ res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
+ ((char *) data) + mqh->mqh_partial_bytes,
nowait, &bytes_written);
if (res == SHM_MQ_WOULD_BLOCK)
- mqh->mqh_partial_message_bytes += bytes_written;
+ mqh->mqh_partial_bytes += bytes_written;
else
{
- mqh->mqh_partial_message_bytes = 0;
- mqh->mqh_did_length_word = false;
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
}
if (res != SHM_MQ_SUCCESS)
return res;
* function again after the process latch has been set.
*/
shm_mq_result
-shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
{
shm_mq *mq = mqh->mqh_queue;
shm_mq_result res;
- uint64 rb = 0;
- uint64 nbytes;
- uint64 needed;
+ Size rb = 0;
+ Size nbytes;
void *rawdata;
Assert(mq->mq_receiver == MyProc);
mqh->mqh_consume_pending = 0;
}
- /* Determine the message length. */
- if (mqh->mqh_did_length_word)
- {
- /* We've partially received a message; recall expected length. */
- nbytes = mqh->mqh_expected_bytes;
- }
- else
+ /* Try to read, or finish reading, the length word from the buffer. */
+ while (!mqh->mqh_length_word_complete)
{
/* Try to receive the message length word. */
- res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+ Assert(mqh->mqh_partial_bytes < sizeof(Size));
+ res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+ nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
- Assert(rb >= sizeof(uint64));
- memcpy(&nbytes, rawdata, sizeof(uint64));
- mqh->mqh_expected_bytes = nbytes;
- /* If we've already got the whole message, we're done. */
- needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
- if (rb >= needed)
+ /*
+ * Hopefully, we'll receive the entire message length word at once.
+ * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
+ * multiple reads.
+ */
+ if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
{
+ Size needed;
+
+ nbytes = * (Size *) rawdata;
+
+ /* If we've already got the whole message, we're done. */
+ needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
+ if (rb >= needed)
+ {
+ /*
+ * Technically, we could consume the message length information
+ * at this point, but the extra write to shared memory wouldn't
+ * be free and in most cases we would reap no benefit.
+ */
+ mqh->mqh_consume_pending = needed;
+ *nbytesp = nbytes;
+ *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
+ return SHM_MQ_SUCCESS;
+ }
+
/*
- * Technically, we could consume the message length information at
- * this point, but the extra write to shared memory wouldn't be
- * free and in most cases we would reap no benefit.
+ * We don't have the whole message, but we at least have the whole
+ * length word.
*/
- mqh->mqh_consume_pending = needed;
- *nbytesp = nbytes;
- *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
- return SHM_MQ_SUCCESS;
+ mqh->mqh_expected_bytes = nbytes;
+ mqh->mqh_length_word_complete = true;
+ shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+ rb -= MAXALIGN(sizeof(Size));
}
+ else
+ {
+ Size lengthbytes;
+
+ /* Can't be split unless bigger than required alignment. */
+ Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
- /* Consume the length word. */
- shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
- mqh->mqh_did_length_word = true;
- rb -= MAXALIGN64(sizeof(uint64));
+ /* Message word is split; need buffer to reassemble. */
+ if (mqh->mqh_buffer == NULL)
+ {
+ mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
+ MQH_INITIAL_BUFSIZE);
+ mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
+ }
+ Assert(mqh->mqh_buflen >= sizeof(Size));
+
+ /* Copy and consume partial length word. */
+ if (mqh->mqh_partial_bytes + rb > sizeof(Size))
+ lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
+ else
+ lengthbytes = rb - mqh->mqh_partial_bytes;
+ memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
+ lengthbytes);
+ mqh->mqh_partial_bytes += lengthbytes;
+ shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+ rb -= lengthbytes;
+
+ /* If we now have the whole word, we're ready to read payload. */
+ if (mqh->mqh_partial_bytes >= sizeof(Size))
+ {
+ Assert(mqh->mqh_partial_bytes == sizeof(Size));
+ mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer;
+ mqh->mqh_length_word_complete = true;
+ mqh->mqh_partial_bytes = 0;
+ }
+ }
}
+ nbytes = mqh->mqh_expected_bytes;
- if (mqh->mqh_partial_message_bytes == 0)
+ if (mqh->mqh_partial_bytes == 0)
{
/*
* Try to obtain the whole message in a single chunk. If this works,
return res;
if (rb >= nbytes)
{
- mqh->mqh_did_length_word = false;
- mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_consume_pending = MAXALIGN(nbytes);
*nbytesp = nbytes;
*datap = rawdata;
return SHM_MQ_SUCCESS;
*/
if (mqh->mqh_buflen < nbytes)
{
- uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+ Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
while (newbuflen < nbytes)
newbuflen *= 2;
/* Loop until we've copied the entire message. */
for (;;)
{
- uint64 still_needed;
+ Size still_needed;
/* Copy as much as we can. */
- Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
- memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
- mqh->mqh_partial_message_bytes += rb;
+ Assert(mqh->mqh_partial_bytes + rb <= nbytes);
+ memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
+ mqh->mqh_partial_bytes += rb;
/*
* Update count of bytes read, with alignment padding. Note
* end of a message, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read and write is as well.
*/
- Assert(mqh->mqh_partial_message_bytes == nbytes ||
- rb == MAXALIGN64(rb));
- shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+ Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
+ shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
/* If we got all the data, exit the loop. */
- if (mqh->mqh_partial_message_bytes >= nbytes)
+ if (mqh->mqh_partial_bytes >= nbytes)
break;
/* Wait for some more data. */
- still_needed = nbytes - mqh->mqh_partial_message_bytes;
+ still_needed = nbytes - mqh->mqh_partial_bytes;
res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
/* Return the complete message, and reset for next message. */
*nbytesp = nbytes;
*datap = mqh->mqh_buffer;
- mqh->mqh_did_length_word = false;
- mqh->mqh_partial_message_bytes = 0;
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_partial_bytes = 0;
return SHM_MQ_SUCCESS;
}
* Write bytes into a shared message queue.
*/
static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
- uint64 *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
+ Size *bytes_written)
{
shm_mq *mq = mqh->mqh_queue;
- uint64 sent = 0;
+ Size sent = 0;
uint64 used;
- uint64 ringsize = mq->mq_ring_size;
- uint64 available;
+ Size ringsize = mq->mq_ring_size;
+ Size available;
while (sent < nbytes)
{
res = shm_mq_notify_receiver(mq);
if (res != SHM_MQ_SUCCESS)
{
- *bytes_written = res;
+ *bytes_written = sent;
return res;
}
}
else
{
- uint64 offset = mq->mq_bytes_written % ringsize;
- uint64 sendnow = Min(available, ringsize - offset);
+ Size offset = mq->mq_bytes_written % (uint64) ringsize;
+ Size sendnow = Min(available, ringsize - offset);
/* Write as much data as we can via a single memcpy(). */
memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
* end of a run of bytes, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read is as well.
*/
- Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
- shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+ Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+ shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
/*
* For efficiency, we don't set the reader's latch here. We'll
* bytes_needed.
*/
static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
- uint64 *nbytesp, void **datap)
+shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+ Size *nbytesp, void **datap)
{
+ Size ringsize = mq->mq_ring_size;
uint64 used;
- uint64 ringsize = mq->mq_ring_size;
uint64 written;
for (;;)
{
- uint64 offset;
+ Size offset;
bool detached;
/* Get bytes written, so we can compute what's available to read. */
written = shm_mq_get_bytes_written(mq, &detached);
used = written - mq->mq_bytes_read;
Assert(used <= ringsize);
- offset = mq->mq_bytes_read % ringsize;
+ offset = mq->mq_bytes_read % (uint64) ringsize;
/* If we have enough data or buffer has wrapped, we're done. */
if (used >= bytes_needed || offset + used >= ringsize)
* Increment the number of bytes read.
*/
static void
-shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
{
PGPROC *sender;
* Increment the number of bytes written.
*/
static void
-shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
{
SpinLockAcquire(&mq->mq_mutex);
mq->mq_bytes_written += n;