int ObjectPipe<T>::readTimeout(T* t, double msec)
{
T* ptr;
- if(msec != 0) {
- int ret = waitForData(d_fds[0], 0, 1000*msec);
- if(ret < 0)
- unixDie("waiting for data in object pipe");
- if(ret == 0)
- return -1;
- }
- int ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+ int ret = waitForData(d_fds[0], 0, 1000*msec);
+ if(ret < 0)
+ unixDie("waiting for data in object pipe");
+ if(ret == 0)
+ return -1;
+
+ ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING!
if(ret < 0)
unixDie("read");
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
now.tv_nsec += msec*1e6;
- if(now.tv_nsec > 1e9) {
+ while(now.tv_nsec > 1e9) {
now.tv_sec++;
now.tv_nsec-=1e9;
}
{
Combo c;
for(;;) {
+ /* this code is slightly too subtle, but I don't see how it could be any simpler.
+ So we have a set of work to do, and we need to wait until the time arrives to do it.
+ Simultaneously new work might come in. So we try to combine both of these things by
+ setting a timeout on listening to the pipe over which new work comes in. This timeout
+ is equal to the wait until the first thing that needs to be done.
+
+ Two additional cases exist: we have no work to wait for, so we can wait infinitely long.
+ The other special case is that the first we have to do.. is in the past, so we need to do it
+ immediately. */
+
+
double delay=-1; // infinite
struct timespec now;
if(!d_work.empty()) {
clock_gettime(CLOCK_MONOTONIC, &now);
delay=1000*tsdelta(d_work.begin()->first, now);
if(delay < 0) {
- delay=0; // don't wait
+ delay=0; // don't wait - we have work that is late already!
}
}
if(delay != 0 ) {
~ObjectPipe();
void write(T& t);
bool read(T* t); // returns false on EOF
- int readTimeout(T* t, double msec); // -1 is timeout, 0 is no data, 1 is data
+ int readTimeout(T* t, double msec); //!< -1 is timeout, 0 is no data, 1 is data. msec<0 waits infinitely wrong. msec==0 = undefined
void close();
private:
int d_fds[2];
public:
DelayPipe();
~DelayPipe();
- void submit(T& t, int msec);
+ void submit(T& t, int msec); //!< don't try for more than 4294 msec
private:
std::thread d_thread;
};
int done=0;
-BOOST_AUTO_TEST_CASE(test_delay_pipe) {
-
+BOOST_AUTO_TEST_CASE(test_delay_pipe_small) {
struct Work
{
int i;
};
+BOOST_AUTO_TEST_CASE(test_delay_pipe_big) {
+ done=0;
+ struct Work
+ {
+ int i;
+ void operator()()
+ {
+ ++done;
+ }
+ };
+ DelayPipe<Work> dp;
+ int n;
+ for(n=0; n < 1000000; ++n) {
+ Work w{n};
+ dp.submit(w, 100);
+ }
+
+ sleep(1);
+ BOOST_CHECK_EQUAL(done, n);
+};
+
BOOST_AUTO_TEST_SUITE_END();