return true;
}
+static void sbuf_recv_forced_cb(int sock, short flags, void *arg)
+{
+ SBuf *sbuf = arg;
+
+ sbuf->wait_type = W_NONE;
+
+ if (sbuf_wait_for_data(sbuf)) {
+ sbuf_recv_cb(sock, flags, arg);
+ } else {
+ sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+ }
+}
+
+static bool sbuf_wait_for_data_forced(SBuf *sbuf)
+{
+ int err;
+ struct timeval tv_min;
+
+ tv_min.tv_sec = 0;
+ tv_min.tv_usec = 1;
+
+ if (sbuf->wait_type != W_NONE) {
+ event_del(&sbuf->ev);
+ sbuf->wait_type = W_NONE;
+ }
+
+ event_set(&sbuf->ev, sbuf->sock, EV_READ, sbuf_recv_forced_cb, sbuf);
+ err = event_add(&sbuf->ev, &tv_min);
+ if (err < 0) {
+ log_warning("sbuf_wait_for_data: event_add failed: %s", strerror(errno));
+ return false;
+ }
+ sbuf->wait_type = W_ONCE;
+ return true;
+}
+
/* libevent EV_WRITE: called when dest socket is writable again */
static void sbuf_send_cb(int sock, short flags, void *arg)
{
* after resync to process all data. (result is ignored)
*/
ok = sbuf_process_pending(sbuf);
+
+ sbuf_wait_for_data_forced(sbuf);
return;
}
loopcnt++;