]> granicus.if.org Git - icinga2/commitdiff
Cleaned up TcpClient interface.
authorGunnar Beutner <gunnar.beutner@netways.de>
Sun, 15 Jul 2012 22:02:31 +0000 (00:02 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Sun, 15 Jul 2012 22:02:31 +0000 (00:02 +0200)
base/Makefile.am
base/asynctask.h
base/base.vcxproj
base/base.vcxproj.filters
base/fifo.cpp
base/fifo.h
base/i2-base.h
base/ioqueue.h [new file with mode: 0644]
base/tcpclient.cpp
base/tcpclient.h
base/tlsclient.cpp

index 824f5a61db084a4f4146e4ab88d00377e17807dc..2e0a786b550ffe29037085a2714babf7d82e2de2 100644 (file)
@@ -21,6 +21,7 @@ libbase_la_SOURCES =  \
        fifo.cpp \
        fifo.h \
        i2-base.h \
+       ioqueue.h \
        logger.cpp \
        logger.h \
        object.cpp \
index b674f94cfdab4281904e2890bfb8d44170c6d00f..47b57dabf91dcd79006c9015b520d3f8cf8e7e4f 100644 (file)
@@ -69,7 +69,7 @@ public:
 
                try {
                        Run();
-               } catch (const exception& ex) {
+               } catch (const exception&) {
                        FinishException(boost::current_exception());
                }
        }
index 216a75ff41d4db85c2872d2300fd6b96ca2292e0..72cfeb18c884ad8781e81c7325ae0a05d4753f43 100644 (file)
@@ -51,6 +51,7 @@
     <ClInclude Include="configobject.h" />
     <ClInclude Include="dictionary.h" />
     <ClInclude Include="event.h" />
+    <ClInclude Include="ioqueue.h" />
     <ClInclude Include="scriptfunction.h" />
     <ClInclude Include="scripttask.h" />
     <ClInclude Include="logger.h" />
index 4d45264112a63942493bec6a301958b5f9f4f5f1..fa329bab16c7f43190399220577ae554a70414b4 100644 (file)
     <ClInclude Include="scriptfunction.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
+    <ClInclude Include="ioqueue.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Quelldateien">
index 1a5caa65ac971602ce911d0afb4760a2fc14eb4c..e71c2f4e1865eb945edc203901b46d3a63dc5455 100644 (file)
@@ -93,11 +93,9 @@ void FIFO::Optimize(void)
 }
 
 /**
- * Returns the number of bytes that are contained in the FIFO.
- *
- * @returns The number of bytes.
+ * Implements IOQueue::GetAvailableBytes().
  */
-size_t FIFO::GetSize(void) const
+size_t FIFO::GetAvailableBytes(void) const
 {
        return m_DataSize;
 }
@@ -107,32 +105,33 @@ size_t FIFO::GetSize(void) const
  *
  * @returns Pointer to the read buffer.
  */
-const void *FIFO::GetReadBuffer(void) const
+/*const void *FIFO::GetReadBuffer(void) const
 {
        return m_Buffer + m_Offset;
-}
+}*/
 
 /**
- * Reads data from the FIFO and places it in the specified buffer.
- *
- * @param buffer The buffer where the data should be placed (can be NULL if
- *               the reader is not interested in the data).
- * @param count The number of bytes to read.
- * @returns The number of bytes read which may be less than what was requested.
+ * Implements IOQueue::Peek.
  */
-size_t FIFO::Read(void *buffer, size_t count)
+void FIFO::Peek(void *buffer, size_t count)
 {
-       count = (count <= m_DataSize) ? count : m_DataSize;
+       assert(m_DataSize >= count);
 
        if (buffer != NULL)
                memcpy(buffer, m_Buffer + m_Offset, count);
+}
+
+/**
+ * Implements IOQueue::Read.
+ */
+void FIFO::Read(void *buffer, size_t count)
+{
+       Peek(buffer, count);
 
        m_DataSize -= count;
        m_Offset += count;
 
        Optimize();
-
-       return count;
 }
 
 /**
@@ -142,31 +141,20 @@ size_t FIFO::Read(void *buffer, size_t count)
  *              contains the actual size of the available buffer which can
  *              be larger than the requested size.
  */
-void *FIFO::GetWriteBuffer(size_t *count)
+/*void *FIFO::GetWriteBuffer(size_t *count)
 {
        ResizeBuffer(m_Offset + m_DataSize + *count);
        *count = m_AllocSize - m_Offset - m_DataSize;
 
        return m_Buffer + m_Offset + m_DataSize;
-}
+}*/
 
 /**
- * Writes data to the FIFO.
- *
- * @param buffer The data that is to be written (can be NULL if the writer has
- *               already filled the write buffer, e.g. via GetWriteBuffer()).
- * @param count The number of bytes to write.
- * @returns The number of bytes written
+ * Implements IOQueue::Write.
  */
-size_t FIFO::Write(const void *buffer, size_t count)
+void FIFO::Write(const void *buffer, size_t count)
 {
-       if (buffer != NULL) {
-               size_t bufferSize = count;
-               void *target_buffer = GetWriteBuffer(&bufferSize);
-               memcpy(target_buffer, buffer, count);
-       }
-
+       ResizeBuffer(m_Offset + m_DataSize + count);
+       memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
        m_DataSize += count;
-
-       return count;
 }
index 485eaa2c1e3388ed8261b7a0ed0d894729f0db59..f472833e962d2cc279accf518867db037bed7355 100644 (file)
@@ -28,7 +28,7 @@ namespace icinga
  *
  * @ingroup base
  */
-class I2_BASE_API FIFO : public Object
+class I2_BASE_API FIFO : public Object, public IOQueue
 {
 public:
        static const size_t BlockSize = 16 * 1024;
@@ -39,13 +39,13 @@ public:
        FIFO(void);
        ~FIFO(void);
 
-       size_t GetSize(void) const;
+       /*const void *GetReadBuffer(void) const;
+       void *GetWriteBuffer(size_t *count);*/
 
-       const void *GetReadBuffer(void) const;
-       void *GetWriteBuffer(size_t *count);
-
-       size_t Read(void *buffer, size_t count);
-       size_t Write(const void *buffer, size_t count);
+       virtual size_t GetAvailableBytes(void) const;
+       virtual void Peek(void *buffer, size_t count);
+       virtual void Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
 
 private:
        char *m_Buffer;
index 0d46d028cc7fd9b22455e994a8ddd620ace0f3be..e953777cfbb1572e351daf67675cb6207dfc58f8 100644 (file)
@@ -161,6 +161,7 @@ using boost::system_time;
 #include "dictionary.h"
 #include "ringbuffer.h"
 #include "timer.h"
+#include "ioqueue.h"
 #include "fifo.h"
 #include "socket.h"
 #include "tcpsocket.h"
diff --git a/base/ioqueue.h b/base/ioqueue.h
new file mode 100644 (file)
index 0000000..11fd030
--- /dev/null
@@ -0,0 +1,54 @@
+#ifndef IOQUEUE_H
+#define IOQUEUE_H
+
+namespace icinga
+{
+
+/**
+ * An I/O queue.
+ */
+class IOQueue
+{
+public:
+       /**
+        * Retrieves the number of bytes available for reading.
+        *
+        * @returns The number of available bytes.
+        */
+       virtual size_t GetAvailableBytes(void) const = 0;
+
+       /**
+        * Reads data from the queue without advancing the read pointer. Trying
+        * to read more data than is available in the queue is a programming error.
+        * Use GetBytesAvailable() to check how much data is available.
+        *
+        * @buffer The buffer where data should be stored. May be NULL if you're
+        *               not actually interested in the data.
+        * @param count The number of bytes to read from the queue.
+        */
+       virtual void Peek(void *buffer, size_t count) = 0;
+
+       /**
+        * Reads data from the queue. Trying to read more data than is
+        * available in the queue is a programming error. Use GetBytesAvailable()
+        * to check how much data is available.
+        *
+        * @param buffer The buffer where data should be stored. May be NULL if you're
+        *               not actually interested in the data.
+        * @param count The number of bytes to read from the queue.
+        */
+       virtual void Read(void *buffer, size_t count) = 0;
+
+       /**
+        * Writes data to the queue.
+        *
+        * @param buffer The data that is to be written.
+        * @param count The number of bytes to write.
+        * @returns The number of bytes written
+        */
+       virtual void Write(const void *buffer, size_t count) = 0;
+};
+
+}
+
+#endif /* IOQUEUE_H */
\ No newline at end of file
index 3f95783e3965e5e43b6005fa977ec635fb21cf73..fcfdb233db20cbf5506d2c7b6717142eb38d8451 100644 (file)
@@ -103,46 +103,79 @@ void TcpClient::Connect(const string& node, const string& service)
                    "Could not create a suitable socket."));
 }
 
+void TcpClient::HandleWritable(void)
+{
+       int rc;
+       char data[1024];
+       size_t count;
+
+       for (;;) {
+               count = m_SendQueue->GetAvailableBytes();
+
+               if (count == 0)
+                       break;
+
+               if (count > sizeof(data))
+                       count = sizeof(data);
+
+               m_SendQueue->Peek(data, count);
+
+               rc = send(GetFD(), (const char *)data, count, 0);
+
+               if (rc <= 0) {
+                       HandleSocketError(SocketException("send() failed", GetError()));
+                       return;
+               }
+
+               m_SendQueue->Read(NULL, rc);
+       }
+}
+
 /**
- * Retrieves the send queue for the socket.
- *
- * @returns The send queue.
+ * Implements IOQueue::GetAvailableBytes.
  */
-FIFO::Ptr TcpClient::GetSendQueue(void)
+size_t TcpClient::GetAvailableBytes(void) const
 {
-       return m_SendQueue;
+       mutex::scoped_lock lock(GetMutex());
+
+       return m_RecvQueue->GetAvailableBytes();
 }
 
-void TcpClient::HandleWritable(void)
+/**
+ * Implements IOQueue::Peek.
+ */
+void TcpClient::Peek(void *buffer, size_t count)
 {
-       int rc;
+       mutex::scoped_lock lock(GetMutex());
 
-       rc = send(GetFD(), (const char *)m_SendQueue->GetReadBuffer(), m_SendQueue->GetSize(), 0);
+       m_RecvQueue->Peek(buffer, count);
+}
 
-       if (rc <= 0) {
-               HandleSocketError(SocketException("send() failed", GetError()));
-               return;
-       }
+/**
+ * Implements IOQueue::Read.
+ */
+void TcpClient::Read(void *buffer, size_t count)
+{
+       mutex::scoped_lock lock(GetMutex());
 
-       m_SendQueue->Read(NULL, rc);
+       m_RecvQueue->Read(buffer, count);
 }
 
 /**
- * Retrieves the recv queue for the socket.
- *
- * @returns The recv queue.
+ * Implements IOQueue::Write.
  */
-FIFO::Ptr TcpClient::GetRecvQueue(void)
+void TcpClient::Write(const void *buffer, size_t count)
 {
-       return m_RecvQueue;
+       mutex::scoped_lock lock(GetMutex());
+
+       m_SendQueue->Write(buffer, count);
 }
 
 void TcpClient::HandleReadable(void)
 {
        for (;;) {
-               size_t bufferSize = FIFO::BlockSize / 2;
-               char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
-               int rc = recv(GetFD(), buffer, bufferSize, 0);
+               char data[1024];
+               int rc = recv(GetFD(), data, sizeof(data), 0);
 
        #ifdef _WIN32
                if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
@@ -156,7 +189,7 @@ void TcpClient::HandleReadable(void)
                        return;
                }
 
-               m_RecvQueue->Write(NULL, rc);
+               m_RecvQueue->Write(data, rc);
        }
 
        Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
@@ -179,7 +212,7 @@ bool TcpClient::WantsToRead(void) const
  */
 bool TcpClient::WantsToWrite(void) const
 {
-       return (m_SendQueue->GetSize() > 0);
+       return (m_SendQueue->GetAvailableBytes() > 0);
 }
 
 /**
index 479d61734b0400ebc3c01079775ca8388fcfa377..32e918d25e3468bf4363159f95a5d5339a114a08 100644 (file)
@@ -41,7 +41,7 @@ enum TcpClientRole
  *
  * @ingroup base
  */
-class I2_BASE_API TcpClient : public TcpSocket
+class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
 {
 public:
        typedef shared_ptr<TcpClient> Ptr;
@@ -53,11 +53,13 @@ public:
 
        void Connect(const string& node, const string& service);
 
-       FIFO::Ptr GetSendQueue(void);
-       FIFO::Ptr GetRecvQueue(void);
-
        boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
 
+       virtual size_t GetAvailableBytes(void) const;
+       virtual void Peek(void *buffer, size_t count);
+       virtual void Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
+
 protected:
        virtual bool WantsToRead(void) const;
        virtual bool WantsToWrite(void) const;
@@ -65,11 +67,11 @@ protected:
        virtual void HandleReadable(void);
        virtual void HandleWritable(void);
 
-private:
-       TcpClientRole m_Role;
-
        FIFO::Ptr m_SendQueue;
        FIFO::Ptr m_RecvQueue;
+
+private:
+       TcpClientRole m_Role;
 };
 
 /**
index 7ddb76ce0dff7c1f3da2ab357a2b4c7454eddb2e..593bfed855b20494f2510aa485937f91d6b53cdb 100644 (file)
@@ -117,9 +117,8 @@ void TlsClient::HandleReadable(void)
        result = 0;
 
        for (;;) {
-               size_t bufferSize = FIFO::BlockSize / 2;
-               char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
-               int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
+               char data[1024];
+               int rc = SSL_read(m_SSL.get(), data, sizeof(data));
 
                if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
@@ -138,7 +137,7 @@ void TlsClient::HandleReadable(void)
                        }
                }
 
-               GetRecvQueue()->Write(NULL, rc);
+               m_RecvQueue->Write(data, rc);
        }
 
 post_event:
@@ -153,26 +152,41 @@ void TlsClient::HandleWritable(void)
        m_BlockRead = false;
        m_BlockWrite = false;
 
-       int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
-
-       if (rc <= 0) {
-               switch (SSL_get_error(m_SSL.get(), rc)) {
-                       case SSL_ERROR_WANT_READ:
-                               m_BlockWrite = true;
-                               /* fall through */
-                       case SSL_ERROR_WANT_WRITE:
-                               return;
-                       case SSL_ERROR_ZERO_RETURN:
-                               CloseInternal(false);
-                               return;
-                       default:
-                               HandleSocketError(OpenSSLException(
-                                   "SSL_write failed", ERR_get_error()));
-                               return;
+       char data[1024];
+       size_t count;
+
+       for (;;) {
+               count = m_SendQueue->GetAvailableBytes();
+
+               if (count == 0)
+                       break;
+
+               if (count > sizeof(data))
+                       count = sizeof(data);
+
+               m_SendQueue->Peek(data, count);
+
+               int rc = SSL_write(m_SSL.get(), (const char *)data, count);
+
+               if (rc <= 0) {
+                       switch (SSL_get_error(m_SSL.get(), rc)) {
+                               case SSL_ERROR_WANT_READ:
+                                       m_BlockWrite = true;
+                                       /* fall through */
+                               case SSL_ERROR_WANT_WRITE:
+                                       return;
+                               case SSL_ERROR_ZERO_RETURN:
+                                       CloseInternal(false);
+                                       return;
+                               default:
+                                       HandleSocketError(OpenSSLException(
+                                           "SSL_write failed", ERR_get_error()));
+                                       return;
+                       }
                }
-       }
 
-       GetSendQueue()->Read(NULL, rc);
+               m_SendQueue->Read(NULL, rc);
+       }
 }
 
 /**