#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/networkstream.h"
+#include "base/zlibstream.h"
#include "base/application.h"
#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
return;
}
- m_LogFile = boost::make_shared<StdioStream>(fp, true);
+ StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
+ m_LogFile = boost::make_shared<ZlibStream>(logStream);
m_LogMessageCount = 0;
m_LogMessageTimestamp = 0;
}
{
ASSERT(OwnsLock());
+ if (!m_LogFile)
+ return;
+
m_LogFile->Close();
m_LogFile.reset();
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
- StdioStream::Ptr lstream = boost::make_shared<StdioStream>(fp, true);
+ StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
+ ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
String message;
- while (NetString::ReadStringFromStream(lstream, &message)) {
+ while (true) {
+ try {
+ if (!NetString::ReadStringFromStream(lstream, &message))
+ break;
+ } catch (std::exception&) {
+ /* Log files may be incomplete or corrupted. This is perfectly OK. */
+ break;
+ }
+
Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
static void LogGlobHandler(std::vector<int>& files, const String& file);
void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
- StdioStream::Ptr m_LogFile;
+ Stream::Ptr m_LogFile;
double m_LogMessageTimestamp;
size_t m_LogMessageCount;
utility.h \
value.cpp \
value.h \
- win32.h
+ win32.h \
+ zlibstream.cpp \
+ zlibstream.h
libbase_la_CPPFLAGS = \
-DI2_BASE_BUILD \
#include "base/convert.h"
#include "base/debug.h"
#include "base/utility.h"
+#include "base/scriptvariable.h"
#include <sstream>
#include <iostream>
#include <boost/bind.hpp>
double avg_latency, max_latency;
double utilization = 0;
+ Value adaptive = ScriptVariable::Get("ThreadPoolAdaptive");
+
+ if (!adaptive.IsEmpty() && !static_cast<bool>(adaptive))
+ break;
+
{
boost::mutex::scoped_lock lock(m_Mutex);
#include "base/object.h"
#include "base/qstring.h"
#include "base/exception.h"
-#include <openssl/bio.h>
#include <openssl/ssl.h>
+#include <openssl/bio.h>
#include <openssl/err.h>
+#include <openssl/comp.h>
namespace icinga
{
--- /dev/null
+/******************************************************************************
+ * Icinga 2 *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
+ * *
+ * This program is free software; you can redistribute it and/or *
+ * modify it under the terms of the GNU General Public License *
+ * as published by the Free Software Foundation; either version 2 *
+ * of the License, or (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the Free Software Foundation *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ******************************************************************************/
+
+#include "base/zlibstream.h"
+#include "base/objectlock.h"
+#include "base/convert.h"
+#include "base/logger_fwd.h"
+#include <boost/make_shared.hpp>
+
+using namespace icinga;
+
+extern "C" BIO_METHOD *BIO_f_zlib(void);
+
+/**
+ * Constructor for the ZlibStream class.
+ *
+ * @param innerStream The inner stream.
+ * @param compress Whether we're compressing, false if we're decompressing.
+ */
+ZlibStream::ZlibStream(const Stream::Ptr& innerStream)
+ : m_InnerStream(innerStream)
+{
+ BIO *ibio = BIO_new_I2Stream(innerStream);
+ BIO *zbio = BIO_new(BIO_f_zlib());
+ m_BIO = BIO_push(zbio, ibio);
+}
+
+ZlibStream::~ZlibStream(void)
+{
+ Close();
+}
+
+size_t ZlibStream::Read(void *buffer, size_t size)
+{
+ ObjectLock olock(this);
+
+ return BIO_read(m_BIO, buffer, size);
+}
+
+void ZlibStream::Write(const void *buffer, size_t size)
+{
+ ObjectLock olock(this);
+
+ BIO_write(m_BIO, buffer, size);
+}
+
+void ZlibStream::Close(void)
+{
+ ObjectLock olock(this);
+
+ if (m_BIO) {
+ BIO_free_all(m_BIO);
+ m_BIO = NULL;
+
+ m_InnerStream->Close();
+ }
+}
+
+bool ZlibStream::IsEof(void) const
+{
+ ObjectLock olock(this);
+
+ return BIO_eof(m_BIO);
+}
--- /dev/null
+/******************************************************************************
+ * Icinga 2 *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
+ * *
+ * This program is free software; you can redistribute it and/or *
+ * modify it under the terms of the GNU General Public License *
+ * as published by the Free Software Foundation; either version 2 *
+ * of the License, or (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the Free Software Foundation *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ******************************************************************************/
+
+#ifndef ZLIBSTREAM_H
+#define ZLIBSTREAM_H
+
+#include "base/i2-base.h"
+#include "base/stream_bio.h"
+#include <iostream>
+
+namespace icinga {
+
+class ZlibStream : public Stream
+{
+public:
+ DECLARE_PTR_TYPEDEFS(ZlibStream);
+
+ ZlibStream(const Stream::Ptr& innerStream);
+ ~ZlibStream(void);
+
+ virtual size_t Read(void *buffer, size_t size);
+ virtual void Write(const void *buffer, size_t size);
+
+ virtual void Close(void);
+
+ virtual bool IsEof(void) const;
+
+private:
+ Stream::Ptr m_InnerStream;
+ BIO *m_BIO;
+};
+
+}
+
+#endif /* ZLIBSTREAM_H */