unsigned int const payload = MIN(next->length, bytes_transferred);
/* For uTP sockets, the overhead is computed in utp_on_overhead. */
- unsigned int const overhead = io->socket != TR_BAD_SOCKET ? guessPacketOverhead(payload) : 0;
+ unsigned int const overhead = io->socket.type == TR_PEER_SOCKET_TYPE_TCP ? guessPacketOverhead(payload) : 0;
uint64_t const now = tr_time_msec();
tr_bandwidthUsed(&io->bandwidth, TR_UP, payload, next->isPieceData, now);
tr_peerIo* io = vio;
TR_ASSERT(tr_isPeerIo(io));
- TR_ASSERT(io->socket != TR_BAD_SOCKET);
+ TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
int res;
int e;
tr_peerIo* io = vio;
TR_ASSERT(tr_isPeerIo(io));
- TR_ASSERT(io->socket != TR_BAD_SOCKET);
+ TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
int res = 0;
int e;
#endif /* #ifdef WITH_UTP */
static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
- uint8_t const* torrentHash, bool isIncoming, bool isSeed, tr_socket_t socket, struct UTPSocket* utp_socket)
+ uint8_t const* torrentHash, bool isIncoming, bool isSeed, struct tr_peer_socket const socket)
{
TR_ASSERT(session != NULL);
TR_ASSERT(session->events != NULL);
TR_ASSERT(tr_amInEventThread(session));
- TR_ASSERT((socket == TR_BAD_SOCKET) == (utp_socket != NULL));
-#ifndef WITH_UTP
- TR_ASSERT(socket != TR_BAD_SOCKET);
+
+#ifdef WITH_UTP
+ TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP || socket.type == TR_PEER_SOCKET_TYPE_UTP);
+#else
+ TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP);
#endif
- if (socket != TR_BAD_SOCKET)
+ if (socket.type == TR_PEER_SOCKET_TYPE_TCP)
{
- tr_netSetTOS(socket, session->peerSocketTOS);
- maybeSetCongestionAlgorithm(socket, session->peer_congestion_algorithm);
+ tr_netSetTOS(socket.handle.tcp, session->peerSocketTOS);
+ maybeSetCongestionAlgorithm(socket.handle.tcp, session->peer_congestion_algorithm);
}
tr_peerIo* io = tr_new0(tr_peerIo, 1);
io->isSeed = isSeed;
io->port = port;
io->socket = socket;
- io->utp_socket = utp_socket;
io->isIncoming = isIncoming;
io->timeCreated = tr_time();
io->inbuf = evbuffer_new();
tr_bandwidthConstruct(&io->bandwidth, session, parent);
tr_bandwidthSetPeer(&io->bandwidth, io);
dbgmsg(io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent);
- dbgmsg(io, "socket is %" PRIdMAX ", utp_socket is %p", (intmax_t)socket, (void*)utp_socket);
- if (io->socket != TR_BAD_SOCKET)
+ switch (socket.type)
{
- io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io);
- io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io);
- }
+ case TR_PEER_SOCKET_TYPE_TCP:
+ dbgmsg(io, "socket (tcp) is %" PRIdMAX, (intmax_t)socket.handle.tcp);
+ io->event_read = event_new(session->event_base, socket.handle.tcp, EV_READ, event_read_cb, io);
+ io->event_write = event_new(session->event_base, socket.handle.tcp, EV_WRITE, event_write_cb, io);
+ break;
#ifdef WITH_UTP
- else
- {
- UTP_SetSockopt(utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
+ case TR_PEER_SOCKET_TYPE_UTP:
+ dbgmsg(io, "socket (utp) is %p", (void*)socket.handle.utp);
+ UTP_SetSockopt(socket.handle.utp, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
dbgmsg(io, "%s", "calling UTP_SetCallbacks &utp_function_table");
- UTP_SetCallbacks(utp_socket, &utp_function_table, io);
+ UTP_SetCallbacks(socket.handle.utp, &utp_function_table, io);
if (!isIncoming)
{
dbgmsg(io, "%s", "calling UTP_Connect");
- UTP_Connect(utp_socket);
+ UTP_Connect(socket.handle.utp);
}
- }
+
+ break;
#endif
+ default:
+ TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
+ }
+
return io;
}
-tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, tr_socket_t fd,
- struct UTPSocket* utp_socket)
+tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
+ struct tr_peer_socket const socket)
{
TR_ASSERT(session != NULL);
TR_ASSERT(tr_address_is_valid(addr));
- return tr_peerIoNew(session, parent, addr, port, NULL, true, false, fd, utp_socket);
+ return tr_peerIoNew(session, parent, addr, port, NULL, true, false, socket);
}
tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
TR_ASSERT(tr_address_is_valid(addr));
TR_ASSERT(torrentHash != NULL);
- tr_socket_t fd = TR_BAD_SOCKET;
- struct UTPSocket* utp_socket = NULL;
+ struct tr_peer_socket socket = TR_PEER_SOCKET_INIT;
if (utp)
{
- utp_socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
+ socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
}
- if (utp_socket == NULL)
+ if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
{
- fd = tr_netOpenPeerSocket(session, addr, port, isSeed);
- dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)fd);
+ socket = tr_netOpenPeerSocket(session, addr, port, isSeed);
+ dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)(socket.type != TR_PEER_SOCKET_TYPE_NONE ?
+ socket.handle.tcp : TR_BAD_SOCKET));
}
- if (fd == TR_BAD_SOCKET && utp_socket == NULL)
+ if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
{
return NULL;
}
- return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, fd, utp_socket);
+ return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, socket);
}
/***
TR_ASSERT(io->session != NULL);
TR_ASSERT(io->session->events != NULL);
- if (io->socket != TR_BAD_SOCKET)
+ bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
+
+ if (need_events)
{
TR_ASSERT(event_initialized(io->event_read));
TR_ASSERT(event_initialized(io->event_write));
{
dbgmsg(io, "enabling ready-to-read polling");
- if (io->socket != TR_BAD_SOCKET)
+ if (need_events)
{
event_add(io->event_read, NULL);
}
{
dbgmsg(io, "enabling ready-to-write polling");
- if (io->socket != TR_BAD_SOCKET)
+ if (need_events)
{
event_add(io->event_write, NULL);
}
TR_ASSERT(io->session != NULL);
TR_ASSERT(io->session->events != NULL);
- if (io->socket != TR_BAD_SOCKET)
+ bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
+
+ if (need_events)
{
TR_ASSERT(event_initialized(io->event_read));
TR_ASSERT(event_initialized(io->event_write));
{
dbgmsg(io, "disabling ready-to-read polling");
- if (io->socket != TR_BAD_SOCKET)
+ if (need_events)
{
event_del(io->event_read);
}
{
dbgmsg(io, "disabling ready-to-write polling");
- if (io->socket != TR_BAD_SOCKET)
+ if (need_events)
{
event_del(io->event_write);
}
static void io_close_socket(tr_peerIo* io)
{
- if (io->socket != TR_BAD_SOCKET)
+ switch (io->socket.type)
{
- tr_netClose(io->session, io->socket);
- io->socket = TR_BAD_SOCKET;
+ case TR_PEER_SOCKET_TYPE_NONE:
+ break;
+
+ case TR_PEER_SOCKET_TYPE_TCP:
+ tr_netClose(io->session, io->socket.handle.tcp);
+ break;
+
+#ifdef WITH_UTP
+
+ case TR_PEER_SOCKET_TYPE_UTP:
+ UTP_SetCallbacks(io->socket.handle.utp, &dummy_utp_function_table, NULL);
+ UTP_Close(io->socket.handle.utp);
+ break;
+
+#endif
+
+ default:
+ TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
+ io->socket = TR_PEER_SOCKET_INIT;
+
if (io->event_read != NULL)
{
event_free(io->event_read);
event_free(io->event_write);
io->event_write = NULL;
}
-
-#ifdef WITH_UTP
-
- if (io->utp_socket != NULL)
- {
- UTP_SetCallbacks(io->utp_socket, &dummy_utp_function_table, NULL);
- UTP_Close(io->utp_socket);
-
- io->utp_socket = NULL;
- }
-
-#endif
}
static void io_dtor(void* vio)
io_close_socket(io);
io->socket = tr_netOpenPeerSocket(session, &io->addr, io->port, io->isSeed);
- io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io);
- io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io);
- if (io->socket != TR_BAD_SOCKET)
+ if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP)
{
- event_enable(io, pendingEvents);
- tr_netSetTOS(io->socket, session->peerSocketTOS);
- maybeSetCongestionAlgorithm(io->socket, session->peer_congestion_algorithm);
- return 0;
+ return -1;
}
- return -1;
+ io->event_read = event_new(session->event_base, io->socket.handle.tcp, EV_READ, event_read_cb, io);
+ io->event_write = event_new(session->event_base, io->socket.handle.tcp, EV_WRITE, event_write_cb, io);
+
+ event_enable(io, pendingEvents);
+ tr_netSetTOS(io->socket.handle.tcp, session->peerSocketTOS);
+ maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peer_congestion_algorithm);
+
+ return 0;
}
/**
if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch)) != 0)
{
- if (io->utp_socket != NULL) /* utp peer connection */
+ switch (io->socket.type)
{
+ case TR_PEER_SOCKET_TYPE_UTP:
/* UTP_RBDrained notifies libutp that your read buffer is emtpy.
* It opens up the congestion window by sending an ACK (soonish)
* if one was not going to be sent. */
if (evbuffer_get_length(io->inbuf) == 0)
{
- UTP_RBDrained(io->utp_socket);
+ UTP_RBDrained(io->socket.handle.utp);
}
- }
- else /* tcp peer connection */
- {
- int e;
- char err_buf[512];
- EVUTIL_SET_SOCKET_ERROR(0);
- res = evbuffer_read(io->inbuf, io->socket, (int)howmuch);
- e = EVUTIL_SOCKET_ERROR();
-
- dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
+ break;
- if (evbuffer_get_length(io->inbuf))
+ case TR_PEER_SOCKET_TYPE_TCP:
{
- canReadWrapper(io);
- }
+ int e;
+ char err_buf[512];
- if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
- {
- short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
+ EVUTIL_SET_SOCKET_ERROR(0);
+ res = evbuffer_read(io->inbuf, io->socket.handle.tcp, (int)howmuch);
+ e = EVUTIL_SOCKET_ERROR();
- if (res == 0)
+ dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
+
+ if (evbuffer_get_length(io->inbuf))
{
- what |= BEV_EVENT_EOF;
+ canReadWrapper(io);
}
- dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
- tr_net_strerror(err_buf, sizeof(err_buf), e));
+ if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
+ {
+ short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
+
+ if (res == 0)
+ {
+ what |= BEV_EVENT_EOF;
+ }
+
+ dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
+ tr_net_strerror(err_buf, sizeof(err_buf), e));
- io->gotError(io, what, io->userData);
+ io->gotError(io, what, io->userData);
+ }
+
+ break;
}
+
+ default:
+ TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
}
if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_UP, howmuch)) != 0)
{
- if (io->utp_socket != NULL) /* utp peer connection */
+ switch (io->socket.type)
{
- UTP_Write(io->utp_socket, howmuch);
+ case TR_PEER_SOCKET_TYPE_UTP:
+ UTP_Write(io->socket.handle.utp, howmuch);
n = old_len - evbuffer_get_length(io->outbuf);
- }
- else
- {
- int e;
-
- EVUTIL_SET_SOCKET_ERROR(0);
- n = tr_evbuffer_write(io, io->socket, howmuch);
- e = EVUTIL_SOCKET_ERROR();
+ break;
- if (n > 0)
+ case TR_PEER_SOCKET_TYPE_TCP:
{
- didWriteWrapper(io, n);
- }
+ int e;
- if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
- {
- char errstr[512];
- short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
+ EVUTIL_SET_SOCKET_ERROR(0);
+ n = tr_evbuffer_write(io, io->socket.handle.tcp, howmuch);
+ e = EVUTIL_SOCKET_ERROR();
+
+ if (n > 0)
+ {
+ didWriteWrapper(io, n);
+ }
+
+ if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
+ {
+ char errstr[512];
+ short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
- dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
- tr_net_strerror(errstr, sizeof(errstr), e));
+ dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
+ tr_net_strerror(errstr, sizeof(errstr), e));
- io->gotError(io, what, io->userData);
+ io->gotError(io, what, io->userData);
+ }
+
+ break;
}
+
+ default:
+ TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
}