try {
Socket::Ptr client = server->Accept();
Log(LogNotice, "LivestatusListener", "Client connected");
- Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client));
+ Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client), LowLatencyScheduler);
} catch (std::exception&) {
Log(LogCritical, "ListenerListener", "Cannot accept new connection.");
}
* Appends a work item to the work queue. Work items will be processed in FIFO order.
*
* @param callback The callback function for the work item.
+ * @param policy The scheduling policy
* @returns true if the item was queued, false otherwise.
*/
-bool ThreadPool::Post(const ThreadPool::WorkFunction& callback)
+bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy)
{
WorkItem wi;
wi.Callback = callback;
if (queue.Stopped)
return false;
+ if (policy == LowLatencyScheduler)
+ queue.SpawnWorker(m_ThreadGroup);
+
queue.Items.push_back(wi);
queue.CV.notify_one();
}
boost::mutex::scoped_lock lock(m_MgmtMutex);
if (!m_Stopped)
- m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5));
+ m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500));
if (m_Stopped)
break;
int tthreads = wthreads - alive;
/* Make sure there is at least one thread per CPU */
- int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 1U);
+ int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 4U);
if (alive + tthreads < ncput)
tthreads = ncput - alive;
#define QUEUECOUNT 4
+enum SchedulerPolicy
+{
+ DefaultScheduler,
+ LowLatencyScheduler
+};
+
/**
* A thread pool.
*
void Stop(void);
void Join(bool wait_for_stop = false);
- bool Post(const WorkFunction& callback);
+ bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler);
private:
enum ThreadState
#endif /* _WIN32 */
}
-void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
+void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy)
{
- Application::GetTP().Post(callback);
+ Application::GetTP().Post(callback, policy);
}
String Utility::NaturalJoin(const std::vector<String>& tokens)
#include <boost/function.hpp>
#include <boost/thread/tss.hpp>
#include <vector>
+#include "base/threadpool.hpp"
namespace icinga
{
static bool MkDir(const String& path, int flags);
static bool MkDirP(const String& path, int flags);
- static void QueueAsyncCallback(const boost::function<void (void)>& callback);
+ static void QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy = DefaultScheduler);
static String NaturalJoin(const std::vector<String>& tokens);
for (;;) {
try {
Socket::Ptr client = server->Accept();
- Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
+ Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer), LowLatencyScheduler);
} catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Cannot accept new connection.");
}