unsigned wait_for_welcome:1; /* no server yet in pool */
unsigned ready:1; /* server accepts new query */
- unsigned flush_req:1; /* client requested flush */
unsigned admin_user:1;
unsigned own_user:1; /* is console client with same uid */
/* admin conn, waits for completion of PAUSE/SUSPEND cmd */
unsigned wait_for_response:1;
- /* this (server0 socket must be closed ASAP */
+ /* this (server) socket must be closed ASAP */
unsigned close_needed:1;
usec_t connect_time; /* when connection was made */
{
unsigned pkt_type;
unsigned pkt_len;
- bool flush = 0;
SBuf *sbuf = &client->sbuf;
if (!get_header(pkt, &pkt_type, &pkt_len)) {
/* request immidiate response from server */
case 'H': /* Flush */
- client->flush_req = 1;
case 'S': /* Sync */
- /* sync is followed by ReadyForQuery */
/* one-packet queries */
case 'Q': /* Query */
case 'c': /* CopyDone(F/B) */
case 'f': /* CopyFail(F/B) */
- /* above packets should be sent ASAP */
- flush = 1;
-
/*
* extended protocol allows server (and thus pooler)
* to buffer packets until sync or flush is sent by client
client->link->ready = 0;
/* forward the packet */
- sbuf_prepare_send(sbuf, &client->link->sbuf, pkt_len, flush);
+ sbuf_prepare_send(sbuf, &client->link->sbuf, pkt_len);
break;
/* client wants to go away */
sk->wait_for_welcome = 0;
sk->ready = 0;
- sk->flush_req = 0;
sk->admin_user = 0;
sk->own_user = 0;
sk->suspended = 0;
sbuf->dst = NULL;
sbuf->sock = 0;
sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0;
- sbuf->pkt_skip = sbuf->wait_send = sbuf->pkt_flush = 0;
+ sbuf->pkt_skip = sbuf->wait_send = 0;
sbuf->send_pos = sbuf->send_remain = 0;
}
/* proto_fn tells to send some bytes to socket */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush)
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount)
{
AssertActive(sbuf);
Assert(sbuf->pkt_remain == 0);
Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
- Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
Assert(amount > 0);
sbuf->pkt_skip = 0;
sbuf->pkt_remain = amount;
- sbuf->pkt_flush = flush;
sbuf->dst = dst;
}
AssertActive(sbuf);
Assert(sbuf->pkt_remain == 0);
Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
- Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
Assert(amount > 0);
sbuf->pkt_skip = 1;
sbuf->pkt_remain = amount;
- sbuf->pkt_flush = 0;
sbuf->dst = NULL;
}
return;
AssertSanity(sbuf);
+ Assert(sbuf->wait_send);
/* prepare normal situation for sbuf_main_loop */
sbuf->wait_send = 0;
/*
* If start of packet, process packet header.
- *
- * Dont append anything to flush packets, send them first.
*/
- if (sbuf->pkt_remain == 0 && !sbuf->pkt_flush) {
- /* if flush then send it before looking */
-
+ if (sbuf->pkt_remain == 0) {
res = sbuf_call_proto(sbuf, SBUF_EV_READ);
if (!res)
return false;
sbuf->pkt_pos += avail;
/* send data */
- if (sbuf->pkt_skip || sbuf->pkt_flush) {
+ if (sbuf->pkt_skip) {
res = sbuf_send_pending(sbuf);
if (!res)
return false;
* but with skip_recv switch its should not be needed anymore.
*/
free = cf_sbuf_len - sbuf->recv_pos;
- ok = sbuf_actual_recv(sbuf, free);
- if (!ok)
- return;
+ if (free > 0) {
+ ok = sbuf_actual_recv(sbuf, free);
+ if (!ok)
+ return;
+ }
skip_recv:
/* now handle it */
int recv_pos;
int pkt_pos;
- int pkt_remain;
int send_pos;
- int send_remain;
- unsigned wait_send:1;
- unsigned pkt_skip:1;
- unsigned pkt_flush:1;
- unsigned is_unix:1;
+ int pkt_remain; /* total packet length remaining */
+ int send_remain; /* total data to be sent remaining */
+
+ unsigned pkt_skip:1; /* if current packet should be skipped */
+ unsigned is_unix:1; /* is it unix socket */
+ unsigned wait_send:1; /* debug var, otherwise useless */
uint8 buf[0];
};
void sbuf_close(SBuf *sbuf);
/* proto_fn can use those functions to order behaviour */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush);
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount);
void sbuf_prepare_skip(SBuf *sbuf, int amount);
bool sbuf_answer(SBuf *sbuf, const void *buf, int len);
{
unsigned pkt_type;
unsigned pkt_len;
- bool flush = 0;
bool ready = 0;
char state;
SBuf *sbuf = &server->sbuf;
return false;
}
- /* above packers need to be sent immidiately */
- flush = 1;
-
/*
* 'E' and 'N' packets currently set ->ready to 0. Correct would
* be to leave ->ready as-is, because overal TX state stays same.
case 's': /* PortalSuspended */
case 'C': /* CommandComplete */
- /* check if client wanted immidiate response */
- if (client && client->flush_req) {
- flush = 1;
- client->flush_req = 0;
- }
-
/* data packets, there will be more coming */
case 'd': /* CopyData(F/B) */
case 'D': /* DataRow */
case 'T': /* RowDescription */
if (client) {
- sbuf_prepare_send(sbuf, &client->sbuf, pkt_len, flush);
+ sbuf_prepare_send(sbuf, &client->sbuf, pkt_len);
} else {
if (server->state != SV_TESTED)
log_warning("got packet '%c' from server"
} DbConn;
static char *bulk_data;
-static int bulk_data_max = 128*1024; /* power of 2 */
+static int bulk_data_max = 16*1024; /* power of 2 */
/* fill mem with random junk */
static void init_bulk_data(void)