From: Gunnar Beutner Date: Sun, 15 Jul 2012 22:02:31 +0000 (+0200) Subject: Cleaned up TcpClient interface. X-Git-Tag: v0.0.1~226 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=802fc15969cceef6495d51fba8dd1681ee593025;p=icinga2 Cleaned up TcpClient interface. --- diff --git a/base/Makefile.am b/base/Makefile.am index 824f5a61d..2e0a786b5 100644 --- a/base/Makefile.am +++ b/base/Makefile.am @@ -21,6 +21,7 @@ libbase_la_SOURCES = \ fifo.cpp \ fifo.h \ i2-base.h \ + ioqueue.h \ logger.cpp \ logger.h \ object.cpp \ diff --git a/base/asynctask.h b/base/asynctask.h index b674f94cf..47b57dabf 100644 --- a/base/asynctask.h +++ b/base/asynctask.h @@ -69,7 +69,7 @@ public: try { Run(); - } catch (const exception& ex) { + } catch (const exception&) { FinishException(boost::current_exception()); } } diff --git a/base/base.vcxproj b/base/base.vcxproj index 216a75ff4..72cfeb18c 100644 --- a/base/base.vcxproj +++ b/base/base.vcxproj @@ -51,6 +51,7 @@ + diff --git a/base/base.vcxproj.filters b/base/base.vcxproj.filters index 4d4526411..fa329bab1 100644 --- a/base/base.vcxproj.filters +++ b/base/base.vcxproj.filters @@ -180,6 +180,9 @@ Headerdateien + + Headerdateien + diff --git a/base/fifo.cpp b/base/fifo.cpp index 1a5caa65a..e71c2f4e1 100644 --- a/base/fifo.cpp +++ b/base/fifo.cpp @@ -93,11 +93,9 @@ void FIFO::Optimize(void) } /** - * Returns the number of bytes that are contained in the FIFO. - * - * @returns The number of bytes. + * Implements IOQueue::GetAvailableBytes(). */ -size_t FIFO::GetSize(void) const +size_t FIFO::GetAvailableBytes(void) const { return m_DataSize; } @@ -107,32 +105,33 @@ size_t FIFO::GetSize(void) const * * @returns Pointer to the read buffer. */ -const void *FIFO::GetReadBuffer(void) const +/*const void *FIFO::GetReadBuffer(void) const { return m_Buffer + m_Offset; -} +}*/ /** - * Reads data from the FIFO and places it in the specified buffer. - * - * @param buffer The buffer where the data should be placed (can be NULL if - * the reader is not interested in the data). - * @param count The number of bytes to read. - * @returns The number of bytes read which may be less than what was requested. + * Implements IOQueue::Peek. */ -size_t FIFO::Read(void *buffer, size_t count) +void FIFO::Peek(void *buffer, size_t count) { - count = (count <= m_DataSize) ? count : m_DataSize; + assert(m_DataSize >= count); if (buffer != NULL) memcpy(buffer, m_Buffer + m_Offset, count); +} + +/** + * Implements IOQueue::Read. + */ +void FIFO::Read(void *buffer, size_t count) +{ + Peek(buffer, count); m_DataSize -= count; m_Offset += count; Optimize(); - - return count; } /** @@ -142,31 +141,20 @@ size_t FIFO::Read(void *buffer, size_t count) * contains the actual size of the available buffer which can * be larger than the requested size. */ -void *FIFO::GetWriteBuffer(size_t *count) +/*void *FIFO::GetWriteBuffer(size_t *count) { ResizeBuffer(m_Offset + m_DataSize + *count); *count = m_AllocSize - m_Offset - m_DataSize; return m_Buffer + m_Offset + m_DataSize; -} +}*/ /** - * Writes data to the FIFO. - * - * @param buffer The data that is to be written (can be NULL if the writer has - * already filled the write buffer, e.g. via GetWriteBuffer()). - * @param count The number of bytes to write. - * @returns The number of bytes written + * Implements IOQueue::Write. */ -size_t FIFO::Write(const void *buffer, size_t count) +void FIFO::Write(const void *buffer, size_t count) { - if (buffer != NULL) { - size_t bufferSize = count; - void *target_buffer = GetWriteBuffer(&bufferSize); - memcpy(target_buffer, buffer, count); - } - + ResizeBuffer(m_Offset + m_DataSize + count); + memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count); m_DataSize += count; - - return count; } diff --git a/base/fifo.h b/base/fifo.h index 485eaa2c1..f472833e9 100644 --- a/base/fifo.h +++ b/base/fifo.h @@ -28,7 +28,7 @@ namespace icinga * * @ingroup base */ -class I2_BASE_API FIFO : public Object +class I2_BASE_API FIFO : public Object, public IOQueue { public: static const size_t BlockSize = 16 * 1024; @@ -39,13 +39,13 @@ public: FIFO(void); ~FIFO(void); - size_t GetSize(void) const; + /*const void *GetReadBuffer(void) const; + void *GetWriteBuffer(size_t *count);*/ - const void *GetReadBuffer(void) const; - void *GetWriteBuffer(size_t *count); - - size_t Read(void *buffer, size_t count); - size_t Write(const void *buffer, size_t count); + virtual size_t GetAvailableBytes(void) const; + virtual void Peek(void *buffer, size_t count); + virtual void Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); private: char *m_Buffer; diff --git a/base/i2-base.h b/base/i2-base.h index 0d46d028c..e953777cf 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -161,6 +161,7 @@ using boost::system_time; #include "dictionary.h" #include "ringbuffer.h" #include "timer.h" +#include "ioqueue.h" #include "fifo.h" #include "socket.h" #include "tcpsocket.h" diff --git a/base/ioqueue.h b/base/ioqueue.h new file mode 100644 index 000000000..11fd03035 --- /dev/null +++ b/base/ioqueue.h @@ -0,0 +1,54 @@ +#ifndef IOQUEUE_H +#define IOQUEUE_H + +namespace icinga +{ + +/** + * An I/O queue. + */ +class IOQueue +{ +public: + /** + * Retrieves the number of bytes available for reading. + * + * @returns The number of available bytes. + */ + virtual size_t GetAvailableBytes(void) const = 0; + + /** + * Reads data from the queue without advancing the read pointer. Trying + * to read more data than is available in the queue is a programming error. + * Use GetBytesAvailable() to check how much data is available. + * + * @buffer The buffer where data should be stored. May be NULL if you're + * not actually interested in the data. + * @param count The number of bytes to read from the queue. + */ + virtual void Peek(void *buffer, size_t count) = 0; + + /** + * Reads data from the queue. Trying to read more data than is + * available in the queue is a programming error. Use GetBytesAvailable() + * to check how much data is available. + * + * @param buffer The buffer where data should be stored. May be NULL if you're + * not actually interested in the data. + * @param count The number of bytes to read from the queue. + */ + virtual void Read(void *buffer, size_t count) = 0; + + /** + * Writes data to the queue. + * + * @param buffer The data that is to be written. + * @param count The number of bytes to write. + * @returns The number of bytes written + */ + virtual void Write(const void *buffer, size_t count) = 0; +}; + +} + +#endif /* IOQUEUE_H */ \ No newline at end of file diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp index 3f95783e3..fcfdb233d 100644 --- a/base/tcpclient.cpp +++ b/base/tcpclient.cpp @@ -103,46 +103,79 @@ void TcpClient::Connect(const string& node, const string& service) "Could not create a suitable socket.")); } +void TcpClient::HandleWritable(void) +{ + int rc; + char data[1024]; + size_t count; + + for (;;) { + count = m_SendQueue->GetAvailableBytes(); + + if (count == 0) + break; + + if (count > sizeof(data)) + count = sizeof(data); + + m_SendQueue->Peek(data, count); + + rc = send(GetFD(), (const char *)data, count, 0); + + if (rc <= 0) { + HandleSocketError(SocketException("send() failed", GetError())); + return; + } + + m_SendQueue->Read(NULL, rc); + } +} + /** - * Retrieves the send queue for the socket. - * - * @returns The send queue. + * Implements IOQueue::GetAvailableBytes. */ -FIFO::Ptr TcpClient::GetSendQueue(void) +size_t TcpClient::GetAvailableBytes(void) const { - return m_SendQueue; + mutex::scoped_lock lock(GetMutex()); + + return m_RecvQueue->GetAvailableBytes(); } -void TcpClient::HandleWritable(void) +/** + * Implements IOQueue::Peek. + */ +void TcpClient::Peek(void *buffer, size_t count) { - int rc; + mutex::scoped_lock lock(GetMutex()); - rc = send(GetFD(), (const char *)m_SendQueue->GetReadBuffer(), m_SendQueue->GetSize(), 0); + m_RecvQueue->Peek(buffer, count); +} - if (rc <= 0) { - HandleSocketError(SocketException("send() failed", GetError())); - return; - } +/** + * Implements IOQueue::Read. + */ +void TcpClient::Read(void *buffer, size_t count) +{ + mutex::scoped_lock lock(GetMutex()); - m_SendQueue->Read(NULL, rc); + m_RecvQueue->Read(buffer, count); } /** - * Retrieves the recv queue for the socket. - * - * @returns The recv queue. + * Implements IOQueue::Write. */ -FIFO::Ptr TcpClient::GetRecvQueue(void) +void TcpClient::Write(const void *buffer, size_t count) { - return m_RecvQueue; + mutex::scoped_lock lock(GetMutex()); + + m_SendQueue->Write(buffer, count); } void TcpClient::HandleReadable(void) { for (;;) { - size_t bufferSize = FIFO::BlockSize / 2; - char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize); - int rc = recv(GetFD(), buffer, bufferSize, 0); + char data[1024]; + int rc = recv(GetFD(), data, sizeof(data), 0); #ifdef _WIN32 if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) @@ -156,7 +189,7 @@ void TcpClient::HandleReadable(void) return; } - m_RecvQueue->Write(NULL, rc); + m_RecvQueue->Write(data, rc); } Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); @@ -179,7 +212,7 @@ bool TcpClient::WantsToRead(void) const */ bool TcpClient::WantsToWrite(void) const { - return (m_SendQueue->GetSize() > 0); + return (m_SendQueue->GetAvailableBytes() > 0); } /** diff --git a/base/tcpclient.h b/base/tcpclient.h index 479d61734..32e918d25 100644 --- a/base/tcpclient.h +++ b/base/tcpclient.h @@ -41,7 +41,7 @@ enum TcpClientRole * * @ingroup base */ -class I2_BASE_API TcpClient : public TcpSocket +class I2_BASE_API TcpClient : public TcpSocket, public IOQueue { public: typedef shared_ptr Ptr; @@ -53,11 +53,13 @@ public: void Connect(const string& node, const string& service); - FIFO::Ptr GetSendQueue(void); - FIFO::Ptr GetRecvQueue(void); - boost::signal OnDataAvailable; + virtual size_t GetAvailableBytes(void) const; + virtual void Peek(void *buffer, size_t count); + virtual void Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); + protected: virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; @@ -65,11 +67,11 @@ protected: virtual void HandleReadable(void); virtual void HandleWritable(void); -private: - TcpClientRole m_Role; - FIFO::Ptr m_SendQueue; FIFO::Ptr m_RecvQueue; + +private: + TcpClientRole m_Role; }; /** diff --git a/base/tlsclient.cpp b/base/tlsclient.cpp index 7ddb76ce0..593bfed85 100644 --- a/base/tlsclient.cpp +++ b/base/tlsclient.cpp @@ -117,9 +117,8 @@ void TlsClient::HandleReadable(void) result = 0; for (;;) { - size_t bufferSize = FIFO::BlockSize / 2; - char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize); - int rc = SSL_read(m_SSL.get(), buffer, bufferSize); + char data[1024]; + int rc = SSL_read(m_SSL.get(), data, sizeof(data)); if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { @@ -138,7 +137,7 @@ void TlsClient::HandleReadable(void) } } - GetRecvQueue()->Write(NULL, rc); + m_RecvQueue->Write(data, rc); } post_event: @@ -153,26 +152,41 @@ void TlsClient::HandleWritable(void) m_BlockRead = false; m_BlockWrite = false; - int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize()); - - if (rc <= 0) { - switch (SSL_get_error(m_SSL.get(), rc)) { - case SSL_ERROR_WANT_READ: - m_BlockWrite = true; - /* fall through */ - case SSL_ERROR_WANT_WRITE: - return; - case SSL_ERROR_ZERO_RETURN: - CloseInternal(false); - return; - default: - HandleSocketError(OpenSSLException( - "SSL_write failed", ERR_get_error())); - return; + char data[1024]; + size_t count; + + for (;;) { + count = m_SendQueue->GetAvailableBytes(); + + if (count == 0) + break; + + if (count > sizeof(data)) + count = sizeof(data); + + m_SendQueue->Peek(data, count); + + int rc = SSL_write(m_SSL.get(), (const char *)data, count); + + if (rc <= 0) { + switch (SSL_get_error(m_SSL.get(), rc)) { + case SSL_ERROR_WANT_READ: + m_BlockWrite = true; + /* fall through */ + case SSL_ERROR_WANT_WRITE: + return; + case SSL_ERROR_ZERO_RETURN: + CloseInternal(false); + return; + default: + HandleSocketError(OpenSSLException( + "SSL_write failed", ERR_get_error())); + return; + } } - } - GetSendQueue()->Read(NULL, rc); + m_SendQueue->Read(NULL, rc); + } } /**