#include <unistd.h>
#include "threadname.hh"
#include "remote_logger.hh"
+#include <sys/uio.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "dolog.hh"
#endif
+void CircularWriteBuffer::write(const std::string& str)
+{
+ if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+ flush();
+
+ if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+ throw std::runtime_error("Full!");
+
+ uint16_t len = htons(str.size());
+ char* ptr = (char*)&len;
+ d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+ d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+}
+
+void CircularWriteBuffer::flush()
+{
+ if(d_buffer.empty()) // not optional, we report EOF otherwise
+ return;
+
+ auto arr1 = d_buffer.array_one();
+ auto arr2 = d_buffer.array_two();
+
+ struct iovec iov[2];
+ int pos=0;
+ size_t total=0;
+ for(const auto& arr : {arr1, arr2}) {
+ if(arr.second) {
+ iov[pos].iov_base = arr.first;
+ iov[pos].iov_len = arr.second;
+ total += arr.second;
+ ++pos;
+ }
+ }
+
+ int res = writev(d_fd, iov, pos);
+ if(res < 0) {
+ throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
+ }
+ if(!res) {
+ throw std::runtime_error("EOF");
+ }
+ // cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
+ if((size_t)res == d_buffer.size())
+ d_buffer.clear();
+ else {
+ while(res--)
+ d_buffer.pop_front();
+ }
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+{
+ if (!d_asyncConnect) {
+ if(reconnect())
+ d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+ }
+ d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+}
+
bool RemoteLogger::reconnect()
{
if (d_socket >= 0) {
close(d_socket);
d_socket = -1;
}
- d_connected = false;
try {
d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
setNonBlocking(d_socket);
#endif
return false;
}
- d_connected = true;
return true;
}
-void RemoteLogger::busyReconnectLoop()
+void RemoteLogger::queueData(const std::string& data)
{
- while (!reconnect()) {
- sleep(d_reconnectWaitTime);
+ if(!d_writer) {
+ d_drops++;
+ return;
+ }
+ std::unique_lock<std::mutex> lock(d_mutex);
+ if(d_writer) {
+ try {
+ d_writer->write(data);
+ }
+ catch(std::exception& e) {
+ // cout << "Got exception writing: "<<e.what()<<endl;
+ d_drops++;
+ d_writer.reset();
+ close(d_socket);
+ d_socket = -1;
+ }
}
}
-void RemoteLogger::worker()
+
+void RemoteLogger::maintenanceThread()
+try
{
#ifdef WE_ARE_RECURSOR
string threadName = "pdns-r/remLog";
string threadName = "dnsdist/remLog";
#endif
setThreadName(threadName);
- while(true) {
- std::string data;
- {
- std::unique_lock<std::mutex> lock(d_writeMutex);
- d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
- if (d_exiting) {
- return;
- }
- data = d_writeQueue.front();
- d_writeQueue.pop();
- }
- if (!d_connected) {
- busyReconnectLoop();
- }
+ for(;;) {
+ if(d_exiting)
+ break;
- try {
- uint16_t len = static_cast<uint16_t>(data.length());
- sendSizeAndMsgWithTimeout(d_socket, len, data.c_str(), static_cast<int>(d_timeout), nullptr, nullptr, 0, 0, 0);
- }
- catch(const std::runtime_error& e) {
-#ifdef WE_ARE_RECURSOR
- g_log<<Logger::Info<<"Error sending data to remote logger "<<d_remote.toStringWithPort()<<": "<< e.what()<<endl;
-#else
- vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
-#endif
- busyReconnectLoop();
+ if(d_writer) {
+ std::unique_lock<std::mutex> lock(d_mutex);
+ if(d_writer) { // check if it is still set
+ // cout<<"Flush"<<endl;
+ try {
+ d_writer->flush();
+ }
+ catch(std::exception& e) {
+ // cout<<"Flush failed!"<<endl;
+ d_writer.reset();
+ close(d_socket);
+ d_socket = -1;
+ }
+ }
}
- }
-}
-
-void RemoteLogger::queueData(const std::string& data)
-{
- {
- std::lock_guard<std::mutex> lock(d_writeMutex);
- if (d_writeQueue.size() >= d_maxQueuedEntries) {
- d_writeQueue.pop();
+ else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
+ std::unique_lock<std::mutex> lock(d_mutex);
+ d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
}
- d_writeQueue.push(data);
+ sleep(d_reconnectWaitTime);
}
- d_queueCond.notify_one();
}
-
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this)
+catch(std::exception& e)
{
- if (!d_asyncConnect) {
- reconnect();
- }
+ cerr<<"Thead died on: "<<e.what()<<endl;
}
RemoteLogger::~RemoteLogger()
if (d_socket >= 0) {
close(d_socket);
d_socket = -1;
- d_connected = false;
}
- d_queueCond.notify_one();
+
d_thread.join();
}
#include <thread>
#include "iputils.hh"
+#include <boost/circular_buffer.hpp>
+
+/* Writes can be submitted and they are atomically accepted. Either the whole write
+ ends up in the buffer or nothing ends up in the buffer.
+ In case nothing ends up in the buffer, an exception is thrown.
+ Similarly, EOF leads to this treatment
+
+ The filedescriptor can be in non-blocking mode.
+
+ This class is not threadsafe.
+*/
+
+class CircularWriteBuffer
+{
+public:
+ explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
+ {
+ }
+
+ void write(const std::string& str);
+ void flush();
+private:
+ int d_fd;
+ boost::circular_buffer<char> d_buffer;
+};
class RemoteLoggerInterface
{
virtual std::string toString() const = 0;
};
+/* Thread safe. Will connect asynchronously on request.
+ Runs a reconnection thread that also periodicall flushes.
+ Note that the buffer only runs as long as there is a connection.
+ If there is no connection we don't buffer a thing
+*/
class RemoteLogger : public RemoteLoggerInterface
{
public:
- RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
- virtual ~RemoteLogger();
- virtual void queueData(const std::string& data) override;
- virtual std::string toString() const override
+ RemoteLogger(const ComboAddress& remote, uint16_t timeout=2,
+ uint64_t maxQueuedBytes=100000,
+ uint8_t reconnectWaitTime=1,
+ bool asyncConnect=false);
+ ~RemoteLogger();
+ void queueData(const std::string& data) override;
+ std::string toString() const override
{
- return "RemoteLogger to " + d_remote.toStringWithPort();
+ return d_remote.toStringWithPort();
}
void stop()
{
d_exiting = true;
}
+ std::atomic<uint32_t> d_drops{0};
private:
- void busyReconnectLoop();
bool reconnect();
- void worker();
+ void maintenanceThread();
- std::queue<std::string> d_writeQueue;
- std::mutex d_writeMutex;
- std::condition_variable d_queueCond;
ComboAddress d_remote;
- uint64_t d_maxQueuedEntries;
+ uint64_t d_maxQueuedBytes;
int d_socket{-1};
+ std::unique_ptr<CircularWriteBuffer> d_writer;
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
std::atomic<bool> d_exiting{false};
+
bool d_asyncConnect{false};
- bool d_connected{false};
std::thread d_thread;
+ std::mutex d_mutex;
};