}
template<class T>
-int ObjectPipe<T>::readTimeout(T* t, int msec)
+int ObjectPipe<T>::readTimeout(T* t, double msec)
{
T* ptr;
- int ret = waitForData(d_fds[0], 0, 1000*msec);
- if(ret <0)
- unixDie("waiting for data in object pipe");
- if(ret == 0)
- return -1;
+ if(msec != 0) {
+ int ret = waitForData(d_fds[0], 0, 1000*msec);
+ if(ret < 0)
+ unixDie("waiting for data in object pipe");
+ if(ret == 0)
+ return -1;
+ }
- ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+ int ret = ::read(d_fds[0], &ptr, sizeof(ptr));
if(ret < 0)
unixDie("read");
d_thread.join();
}
+
+
template<class T>
void DelayPipe<T>::worker()
{
Combo c;
for(;;) {
- int ret = d_pipe.readTimeout(&c, 10); // XXXX NEEDS TO BE DYNAMIC
- if(ret > 0) { // we got an object
- d_work.insert(make_pair(c.when, c.what));
- }
- else if(ret==0) { // timeout
-
- break;
+ double delay=-1; // infinite
+ struct timespec now;
+ if(!d_work.empty()) {
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ delay=1000*tsdelta(d_work.begin()->first, now);
+ if(delay < 0) {
+ delay=0; // don't wait
+ }
}
- else {
- // cout<<"Got a timeout"<<endl;
+ if(delay != 0 ) {
+ int ret = d_pipe.readTimeout(&c, delay);
+ if(ret > 0) { // we got an object
+ d_work.insert(make_pair(c.when, c.what));
+ }
+ else if(ret==0) { // EOF
+ break;
+ }
+ else {
+ ;
+ }
+ clock_gettime(CLOCK_MONOTONIC, &now);
}
- struct timespec now;
- clock_gettime(CLOCK_MONOTONIC, &now);
+
tscomp cmp;
+
for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful
if(cmp(iter->first, now)) {
iter->second();
d_work.erase(iter++);
}
- else break;
+ else {
+ break;
+ }
}
}
}
~ObjectPipe();
void write(T& t);
bool read(T* t); // returns false on EOF
- int readTimeout(T* t, int msec); // -1 is timeout, 0 is no data, 1 is data
+ int readTimeout(T* t, double msec); // -1 is timeout, 0 is no data, 1 is data
void close();
private:
int d_fds[2];
struct timespec when;
};
+ double tsdelta(const struct timespec& a, const struct timespec& b) // read as a-b
+ {
+ return 1.0*(a.tv_sec-b.tv_sec)+1.0*(a.tv_nsec-b.tv_nsec)/1000000000.0;
+ }
+
ObjectPipe<Combo> d_pipe;
struct tscomp {
bool operator()(const struct timespec& a, const struct timespec& b) const