#include "dnsmessage.pb.h"
#endif
-void RemoteLogger::reconnect()
+bool RemoteLogger::reconnect()
{
if (d_socket >= 0) {
close(d_socket);
setNonBlocking(d_socket);
}
catch(const std::exception& e) {
- infolog("Error connecting to %s: %s", d_remote.toStringWithPort(), e.what());
+ infolog("Error connecting to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+ return false;
}
+ return true;
}
bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
return true;
}
+void RemoteLogger::worker()
+{
+ while(true) {
+ std::string data;
+ {
+ std::unique_lock<std::mutex> lock(d_writeMutex);
+ d_queueCond.wait(lock, [this]{return !d_writeQueue.empty();});
+ data = d_writeQueue.front();
+ d_writeQueue.pop();
+ }
+
+ try {
+ uint32_t len = htonl(data.length());
+ writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout);
+ writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout);
+ }
+ catch(const std::runtime_error& e) {
+ vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+
+ while (!reconnect()) {
+ sleep(d_reconnectWaitTime);
+ }
+ }
+ }
+}
+
+void RemoteLogger::queueData(const std::string& data)
+{
+ {
+ std::unique_lock<std::mutex> lock(d_writeMutex);
+ if (d_writeQueue.size() >= d_maxQueuedEntries) {
+ d_writeQueue.pop();
+ }
+ d_writeQueue.push(data);
+ }
+ d_queueCond.notify_one();
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this)
+{
+#ifdef HAVE_PROTOBUF
+ reconnect();
+#else
+ throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled.");
+#endif /* HAVE_PROTOBUF */
+}
+
+RemoteLogger::~RemoteLogger()
+{
+ if (d_socket >= 0)
+ close(d_socket);
+}
+
void RemoteLogger::logQuery(const DNSQuestion& dq)
{
#ifdef HAVE_PROTOBUF
//cerr <<message.DebugString()<<endl;
std::string str;
message.SerializeToString(&str);
- uint32_t len = htonl(str.length());
- if (sendData((const char*) &len, sizeof(len)))
- sendData(str.c_str(), str.length());
+ queueData(str);
message.release_question();
#endif /* HAVE_PROTOBUF */
}
//cerr <<message.DebugString()<<endl;
std::string str;
message.SerializeToString(&str);
- uint32_t len = htonl(str.length());
- if (sendData((const char*) &len, sizeof(len)))
- sendData(str.c_str(), str.length());
+ queueData(str);
message.release_response();
#endif /* HAVE_PROTOBUF */
}
#pragma once
#include "config.h"
+#include <condition_variable>
+#include <queue>
+
class RemoteLogger
{
public:
- RemoteLogger(const ComboAddress& remote): d_remote(remote)
- {
-#ifdef HAVE_PROTOBUF
- reconnect();
-#else
- throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled.");
-#endif /* HAVE_PROTOBUF */
- }
- ~RemoteLogger()
- {
- if (d_socket >= 0)
- close(d_socket);
- }
+ RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1);
+ ~RemoteLogger();
void logQuery(const DNSQuestion& dq);
void logResponse(const DNSQuestion& dr);
std::string toString()
return d_remote.toStringWithPort();
}
private:
- void reconnect();
+ bool reconnect();
bool sendData(const char* buffer, size_t bufferSize);
+ void worker();
+ void queueData(const std::string& data);
+ std::queue<std::string> d_writeQueue;
+ std::mutex d_writeMutex;
+ std::condition_variable d_queueCond;
ComboAddress d_remote;
+ uint64_t d_maxQueuedEntries;
int d_socket{-1};
+ uint16_t d_timeout;
+ uint8_t d_reconnectWaitTime;
+ std::thread d_thread;
};