*/
Socket::~Socket(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(true);
}
*/
void Socket::Close(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(false);
}
*/
String Socket::GetClientAddress(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
sockaddr_storage sin;
socklen_t len = sizeof(sin);
*/
String Socket::GetPeerAddress(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
sockaddr_storage sin;
socklen_t len = sizeof(sin);
void Socket::ReadThreadProc(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
for (;;) {
fd_set readfds, exceptfds;
void Socket::WriteThreadProc(void)
{
- mutex::scoped_lock lock(m_Mutex);
+ mutex::scoped_lock lock(m_SocketMutex);
for (;;) {
fd_set writefds;
}
}
-mutex& Socket::GetMutex(void) const
-{
- return m_Mutex;
-}
-
void Socket::SetConnected(bool connected)
{
m_Connected = connected;
String GetClientAddress(void);
String GetPeerAddress(void);
- mutex& GetMutex(void) const;
-
bool IsConnected(void) const;
void CheckException(void);
virtual void CloseInternal(bool from_dtor);
+ mutable mutex m_SocketMutex;
+
private:
SOCKET m_FD; /**< The socket descriptor. */
bool m_Connected;
condition_variable m_WriteCV;
- mutable mutex m_Mutex;
boost::exception_ptr m_Exception;
void ReadThreadProc(void);
}
for (;;) {
- count = m_SendQueue->GetAvailableBytes();
+ {
+ mutex::scoped_lock lock(m_QueueMutex);
- if (count == 0)
- break;
+ count = m_SendQueue->GetAvailableBytes();
- if (count > sizeof(data))
- count = sizeof(data);
+ if (count == 0)
+ break;
- m_SendQueue->Peek(data, count);
+ if (count > sizeof(data))
+ count = sizeof(data);
+
+ m_SendQueue->Peek(data, count);
+ }
rc = send(GetFD(), (const char *)data, count, 0);
if (rc <= 0)
throw_exception(SocketException("send() failed", GetError()));
- m_SendQueue->Read(NULL, rc);
+ {
+ mutex::scoped_lock lock(m_QueueMutex);
+ m_SendQueue->Read(NULL, rc);
+ }
}
}
*/
size_t TcpClient::GetAvailableBytes(void) const
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_QueueMutex);
return m_RecvQueue->GetAvailableBytes();
}
*/
void TcpClient::Peek(void *buffer, size_t count)
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Peek(buffer, count);
}
*/
void TcpClient::Read(void *buffer, size_t count)
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Read(buffer, count);
}
*/
void TcpClient::Write(const void *buffer, size_t count)
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Write(buffer, count);
}
if (rc <= 0)
throw_exception(SocketException("recv() failed", GetError()));
- m_RecvQueue->Write(data, rc);
+ {
+ mutex::scoped_lock lock(m_QueueMutex);
+
+ m_RecvQueue->Write(data, rc);
+ }
}
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
*/
bool TcpClient::WantsToWrite(void) const
{
- return (m_SendQueue->GetAvailableBytes() > 0 || !IsConnected());
+ {
+ mutex::scoped_lock lock(m_QueueMutex);
+
+ if (m_SendQueue->GetAvailableBytes() > 0)
+ return true;
+ }
+
+ return (!IsConnected());
}
/**
virtual void HandleReadable(void);
virtual void HandleWritable(void);
+ mutable mutex m_QueueMutex;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
*/
shared_ptr<X509> TlsClient::GetClientCertificate(void) const
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
}
*/
shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
{
- mutex::scoped_lock lock(GetMutex());
+ mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
}
}
- if (IsConnected())
+ if (IsConnected()) {
+ mutex::scoped_lock lock(m_QueueMutex);
+
m_RecvQueue->Write(data, rc);
+ }
}
post_event:
int rc;
if (IsConnected()) {
- count = m_SendQueue->GetAvailableBytes();
+ {
+ mutex::scoped_lock lock(m_QueueMutex);
+
+ count = m_SendQueue->GetAvailableBytes();
- if (count == 0)
- break;
+ if (count == 0)
+ break;
- if (count > sizeof(data))
- count = sizeof(data);
+ if (count > sizeof(data))
+ count = sizeof(data);
- m_SendQueue->Peek(data, count);
+ m_SendQueue->Peek(data, count);
+ }
rc = SSL_write(m_SSL.get(), (const char *)data, count);
} else {
}
}
- if (IsConnected())
+ if (IsConnected()) {
+ mutex::scoped_lock lock(m_QueueMutex);
+
m_SendQueue->Read(NULL, rc);
+ }
}
}