From: Jordan Lee Date: Fri, 18 Feb 2011 00:31:49 +0000 (+0000) Subject: import libutp into third-party/ and plug it in to autoconf, automake X-Git-Tag: 2.30b1~335 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=01f5a8b0933a505108b0ff5f6eb162c8511bac69;p=transmission import libutp into third-party/ and plug it in to autoconf, automake --- diff --git a/third-party/libutp/LICENSE b/third-party/libutp/LICENSE new file mode 100644 index 000000000..73acb81ca --- /dev/null +++ b/third-party/libutp/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2010 BitTorrent, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/third-party/libutp/Makefile.am b/third-party/libutp/Makefile.am new file mode 100644 index 000000000..847f596ed --- /dev/null +++ b/third-party/libutp/Makefile.am @@ -0,0 +1,6 @@ +AM_CPPFLAGS = -fno-exceptions -fno-rtti -ansi -DPOSIX + +noinst_LIBRARIES = libutp.a +libutp_a_SOURCES = utp.cpp utp_utils.cpp +noinst_HEADERS = StdAfx.h templates.h utp_config_example.h utp.h utp_config.h utp_utils.h utypes.h +EXTRA_DIST = LICENSE README.md diff --git a/third-party/libutp/README.md b/third-party/libutp/README.md new file mode 100644 index 000000000..d7fca4c66 --- /dev/null +++ b/third-party/libutp/README.md @@ -0,0 +1,59 @@ +# libutp - The uTorrent Transport Protocol library. +Copyright (c) 2010 BitTorrent, Inc. + +uTP is a TCP-like implementation of [LEDBAT][ledbat] documented as a BitTorrent +extension in [BEP-29][bep29]. uTP provides provides reliable, ordered delivery +while maintaining minimum extra delay. It is implemented on top of UDP to be +cross-platform and functional today. As a result, uTP is the primary transport +for uTorrent peer-to-peer connections. + +uTP is written in C++, but the external interface is strictly C (ANSI C89). + +## The Interface + +The uTP socket interface is a bit different from the Berkeley socket API to +avoid the need for our own select() implementation, and to make it easier to +write event-based code with minimal buffering. + +When you create a uTP socket, you register a set of callbacks. Most notably, the +on_read callback is a reactive callback which occurs when bytes arrive off the +network. The write side of the socket is proactive, and you call UTP_Write to +indicate the number of bytes you wish to write. As packets are created, the +on_write callback is called for each packet, so you can fill the buffers with +data. + +The libutp interface is not thread-safe. It was designed for use in a +single-threaded asyncronous context, although with proper synchronization +it may be used from a multi-threaded environment as well. + +See utp.h for more details and other API documentation. + +## Examples + +See the utp_test and utp_file directories for examples. + +## Building + +uTP has been known to build on Windows with MSVC and on linux and OS X with gcc. +On Windows, use the MSVC project files (utp.sln, and friends). On other platforms, +building the shared library is as simple as: + + make + +To build one of the examples, which will statically link in everything it needs +from libutp: + + cd utp_test && make + +## License + +libutp is released under the [MIT][lic] license. + +## Related Work + +Research and analysis of congestion control mechanisms can be found [here.][survey] + +[ledbat]: http://datatracker.ietf.org/wg/ledbat/charter/ +[bep29]: http://www.bittorrent.org/beps/bep_0029.html +[lic]: http://www.opensource.org/licenses/mit-license.php +[survey]: http://datatracker.ietf.org/doc/draft-ietf-ledbat-survey/ diff --git a/third-party/libutp/StdAfx.h b/third-party/libutp/StdAfx.h new file mode 100644 index 000000000..a92dfadfb --- /dev/null +++ b/third-party/libutp/StdAfx.h @@ -0,0 +1,11 @@ +#if !defined(AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_) +#define AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_ + +#if _MSC_VER > 1000 +#pragma once +#endif // _MSC_VER > 1000 + +// I don't have anything to put here, but some projects use precompiled headers, +// so I include StdAfx.h anyway, so they don't have to edit the files to compile normally. + +#endif // !defined(AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_) diff --git a/third-party/libutp/templates.h b/third-party/libutp/templates.h new file mode 100644 index 000000000..9e98fb3d7 --- /dev/null +++ b/third-party/libutp/templates.h @@ -0,0 +1,164 @@ +#ifndef __TEMPLATES_H__ +#define __TEMPLATES_H__ + +#include "utypes.h" +#include + +#if defined(POSIX) +/* Allow over-writing FORCEINLINE from makefile because gcc 3.4.4 for buffalo + doesn't seem to support __attribute__((always_inline)) in -O0 build + (strangely, it works in -Os build) */ +#ifndef FORCEINLINE +// The always_inline attribute asks gcc to inline the function even if no optimization is being requested. +// This macro should be used exclusive-or with the inline directive (use one or the other but not both) +// since Microsoft uses __forceinline to also mean inline, +// and this code is following a Microsoft compatibility model. +// Just setting the attribute without also specifying the inline directive apparently won't inline the function, +// as evidenced by multiply-defined symbols found at link time. +#define FORCEINLINE inline __attribute__((always_inline)) +#endif +#endif + +// Utility templates +#undef min +#undef max + +template static inline T min(T a, T b) { if (a < b) return a; return b; } +template static inline T max(T a, T b) { if (a > b) return a; return b; } + +template static inline T min(T a, T b, T c) { return min(min(a,b),c); } +template static inline T max(T a, T b, T c) { return max(max(a,b),c); } +template static inline T clamp(T v, T mi, T ma) +{ + if (v > ma) v = ma; + if (v < mi) v = mi; + return v; +} + +#pragma pack(push,1) + +namespace aux +{ + FORCEINLINE uint16 host_to_network(uint16 i) { return htons(i); } + FORCEINLINE uint32 host_to_network(uint32 i) { return htonl(i); } + FORCEINLINE int32 host_to_network(int32 i) { return htonl(i); } + FORCEINLINE uint16 network_to_host(uint16 i) { return ntohs(i); } + FORCEINLINE uint32 network_to_host(uint32 i) { return ntohl(i); } + FORCEINLINE int32 network_to_host(int32 i) { return ntohl(i); } +} + +template +struct big_endian +{ + T operator=(T i) { m_integer = aux::host_to_network(i); return i; } + operator T() const { return aux::network_to_host(m_integer); } +private: + T m_integer; +}; + +typedef big_endian int32_big; +typedef big_endian uint32_big; +typedef big_endian uint16_big; + +#pragma pack(pop) + +template static inline void zeromem(T *a, size_t count = 1) { memset(a, 0, count * sizeof(T)); } + +typedef int SortCompareProc(const void *, const void *); + +template static FORCEINLINE void QuickSortT(T *base, size_t num, int (*comp)(const T *, const T *)) { qsort(base, num, sizeof(T), (SortCompareProc*)comp); } + + +// WARNING: The template parameter MUST be a POD type! +template class Array { +protected: + T *mem; + size_t alloc,count; + +public: + Array(size_t init) { Init(init); } + Array() { Init(); } + ~Array() { Free(); } + + void inline Init() { mem = NULL; alloc = count = 0; } + void inline Init(size_t init) { Init(); if (init) Resize(init); } + size_t inline GetCount() const { return count; } + size_t inline GetAlloc() const { return alloc; } + void inline SetCount(size_t c) { count = c; } + + inline T& operator[](size_t offset) { assert(offset ==0 || offset(minsize, alloc * 2)); } + + inline size_t Append(const T &t) { + if (count >= alloc) Grow(); + size_t r=count++; + mem[r] = t; + return r; + } + + T inline &Append() { + if (count >= alloc) Grow(); + return mem[count++]; + } + + void inline Compact() { + Resize(count); + } + + void inline Free() { + free(mem); + Init(); + } + + void inline Clear() { + count = 0; + } + + bool inline MoveUpLast(size_t index) { + assert(index < count); + size_t c = --count; + if (index != c) { + mem[index] = mem[c]; + return true; + } + return false; + } + + bool inline MoveUpLastExist(const T &v) { + return MoveUpLast(LookupElementExist(v)); + } + + size_t inline LookupElement(const T &v) const { + for(size_t i = 0; i != count; i++) + if (mem[i] == v) + return i; + return (size_t) -1; + } + + bool inline HasElement(const T &v) const { + return LookupElement(v) != -1; + } + + typedef int SortCompareProc(const T *a, const T *b); + + void Sort(SortCompareProc* proc, size_t start, size_t end) { + QuickSortT(&mem[start], end - start, proc); + } + + void Sort(SortCompareProc* proc, size_t start) { + Sort(proc, start, count); + } + + void Sort(SortCompareProc* proc) { + Sort(proc, 0, count); + } +}; + +#endif //__TEMPLATES_H__ diff --git a/third-party/libutp/utp.cpp b/third-party/libutp/utp.cpp new file mode 100644 index 000000000..16c0bf74d --- /dev/null +++ b/third-party/libutp/utp.cpp @@ -0,0 +1,2841 @@ +#include + +#include "utp.h" +#include "templates.h" + +#include +#include +#include +#include +#include +#include +#include // for UINT_MAX + +#ifdef WIN32 +#include "win32_inet_ntop.h" + +// newer versions of MSVC define these in errno.h +#ifndef ECONNRESET +#define ECONNRESET WSAECONNRESET +#define EMSGSIZE WSAEMSGSIZE +#define ECONNREFUSED WSAECONNREFUSED +#define ETIMEDOUT WSAETIMEDOUT +#endif +#endif + +#ifdef POSIX +typedef sockaddr_storage SOCKADDR_STORAGE; +#endif // POSIX + +// number of bytes to increase max window size by, per RTT. This is +// scaled down linearly proportional to off_target. i.e. if all packets +// in one window have 0 delay, window size will increase by this number. +// Typically it's less. TCP increases one MSS per RTT, which is 1500 +#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000 +#define CUR_DELAY_SIZE 3 +// experiments suggest that a clock skew of 10 ms per 325 seconds +// is not impossible. Reset delay_base every 13 minutes. The clock +// skew is dealt with by observing the delay base in the other +// direction, and adjusting our own upwards if the opposite direction +// delay base keeps going down +#define DELAY_BASE_HISTORY 13 +#define MAX_WINDOW_DECAY 100 // ms + +#define REORDER_BUFFER_SIZE 32 +#define REORDER_BUFFER_MAX_SIZE 511 +#define OUTGOING_BUFFER_MAX_SIZE 511 + +#define PACKET_SIZE 350 + +// this is the minimum max_window value. It can never drop below this +#define MIN_WINDOW_SIZE 10 + +// when window sizes are smaller than one packet_size, this +// will pace the packets to average at the given window size +// if it's not set, it will simply not send anything until +// there's a timeout +#define USE_PACKET_PACING 1 + +// if we receive 4 or more duplicate acks, we resend the packet +// that hasn't been acked yet +#define DUPLICATE_ACKS_BEFORE_RESEND 3 + +#define DELAYED_ACK_BYTE_THRESHOLD 2400 // bytes +#define DELAYED_ACK_TIME_THRESHOLD 100 // milliseconds + +#define RST_INFO_TIMEOUT 10000 +#define RST_INFO_LIMIT 1000 +// 29 seconds determined from measuring many home NAT devices +#define KEEPALIVE_INTERVAL 29000 + + +#define SEQ_NR_MASK 0xFFFF +#define ACK_NR_MASK 0xFFFF + +#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom) + +#include "utp_utils.h" +#include "utp_config.h" + +#define LOG_UTP if (g_log_utp) utp_log +#define LOG_UTPV if (g_log_utp_verbose) utp_log + +uint32 g_current_ms; + +// The totals are derived from the following data: +// 45: IPv6 address including embedded IPv4 address +// 11: Scope Id +// 2: Brackets around IPv6 address when port is present +// 6: Port (including colon) +// 1: Terminating null byte +char addrbuf[65]; +char addrbuf2[65]; +#define addrfmt(x, s) x.fmt(s, sizeof(s)) + +#pragma pack(push,1) + +struct PackedSockAddr { + + // The values are always stored here in network byte order + union { + byte _in6[16]; // IPv6 + uint16 _in6w[8]; // IPv6, word based (for convenience) + uint32 _in6d[4]; // Dword access + in6_addr _in6addr; // For convenience + } _in; + + // Host byte order + uint16 _port; + +#define _sin4 _in._in6d[3] // IPv4 is stored where it goes if mapped + +#define _sin6 _in._in6 +#define _sin6w _in._in6w +#define _sin6d _in._in6d + + byte get_family() const + { + return (IN6_IS_ADDR_V4MAPPED((in6_addr*)_sin6) != 0) ? AF_INET : AF_INET6; + } + + bool operator==(const PackedSockAddr& rhs) const + { + if (&rhs == this) + return true; + if (_port != rhs._port) + return false; + return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0; + } + bool operator!=(const PackedSockAddr& rhs) const { return !(*this == rhs); } + + PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len) + { + if (sa->ss_family == AF_INET) { + assert(len >= sizeof(sockaddr_in)); + const sockaddr_in *sin = (sockaddr_in*)sa; + _sin6w[0] = 0; + _sin6w[1] = 0; + _sin6w[2] = 0; + _sin6w[3] = 0; + _sin6w[4] = 0; + _sin6w[5] = 0xffff; + _sin4 = sin->sin_addr.s_addr; + _port = ntohs(sin->sin_port); + } else { + assert(len >= sizeof(sockaddr_in6)); + const sockaddr_in6 *sin6 = (sockaddr_in6*)sa; + _in._in6addr = sin6->sin6_addr; + _port = ntohs(sin6->sin6_port); + } + } + + SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len = NULL) const + { + SOCKADDR_STORAGE sa; + const byte family = get_family(); + if (family == AF_INET) { + sockaddr_in *sin = (sockaddr_in*)&sa; + if (len) *len = sizeof(sockaddr_in); + memset(sin, 0, sizeof(sockaddr_in)); + sin->sin_family = family; + sin->sin_port = htons(_port); + sin->sin_addr.s_addr = _sin4; + } else { + sockaddr_in6 *sin6 = (sockaddr_in6*)&sa; + memset(sin6, 0, sizeof(sockaddr_in6)); + if (len) *len = sizeof(sockaddr_in6); + sin6->sin6_family = family; + sin6->sin6_addr = _in._in6addr; + sin6->sin6_port = htons(_port); + } + return sa; + } + + cstr fmt(str s, size_t len) const + { + memset(s, 0, len); + const byte family = get_family(); + str i; + if (family == AF_INET) { + inet_ntop(family, (uint32*)&_sin4, s, len); + i = s; + while (*++i) {} + } else { + i = s; + *i++ = '['; + inet_ntop(family, (in6_addr*)&_in._in6addr, i, len-1); + while (*++i) {} + *i++ = ']'; + } + snprintf(i, len - (i-s), ":%u", _port); + return s; + } +}; + +struct RST_Info { + PackedSockAddr addr; + uint32 connid; + uint32 timestamp; + uint16 ack_nr; +}; + +// these packet sizes are including the uTP header wich +// is either 20 or 23 bytes depending on version +#define PACKET_SIZE_EMPTY_BUCKET 0 +#define PACKET_SIZE_EMPTY 23 +#define PACKET_SIZE_SMALL_BUCKET 1 +#define PACKET_SIZE_SMALL 373 +#define PACKET_SIZE_MID_BUCKET 2 +#define PACKET_SIZE_MID 723 +#define PACKET_SIZE_BIG_BUCKET 3 +#define PACKET_SIZE_BIG 1400 +#define PACKET_SIZE_HUGE_BUCKET 4 + +struct PacketFormat { + // connection ID + uint32_big connid; + uint32_big tv_sec; + uint32_big tv_usec; + uint32_big reply_micro; + // receive window size in PACKET_SIZE chunks + byte windowsize; + // Type of the first extension header + byte ext; + // Flags + byte flags; + // Sequence number + uint16_big seq_nr; + // Acknowledgment number + uint16_big ack_nr; +}; + +struct PacketFormatAck { + PacketFormat pf; + byte ext_next; + byte ext_len; + byte acks[4]; +}; + +struct PacketFormatExtensions { + PacketFormat pf; + byte ext_next; + byte ext_len; + byte extensions[8]; +}; + +struct PacketFormatV1 { + // protocol version + byte version:4; + // type (formerly flags) + byte type:4; + // Type of the first extension header + byte ext; + // connection ID + uint16_big connid; + uint32_big tv_usec; + uint32_big reply_micro; + // receive window size in bytes + uint32_big windowsize; + // Sequence number + uint16_big seq_nr; + // Acknowledgment number + uint16_big ack_nr; +}; + +struct PacketFormatAckV1 { + PacketFormatV1 pf; + byte ext_next; + byte ext_len; + byte acks[4]; +}; + +struct PacketFormatExtensionsV1 { + PacketFormatV1 pf; + byte ext_next; + byte ext_len; + byte extensions[8]; +}; + +#pragma pack(pop) + +enum { + ST_DATA = 0, // Data packet. + ST_FIN = 1, // Finalize the connection. This is the last packet. + ST_STATE = 2, // State packet. Used to transmit an ACK with no data. + ST_RESET = 3, // Terminate connection forcefully. + ST_SYN = 4, // Connect SYN + ST_NUM_STATES, // used for bounds checking +}; + +static const cstr flagnames[] = { + "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN" +}; + +enum CONN_STATE { + CS_IDLE = 0, + CS_SYN_SENT = 1, + CS_CONNECTED = 2, + CS_CONNECTED_FULL = 3, + CS_GOT_FIN = 4, + CS_DESTROY_DELAY = 5, + CS_FIN_SENT = 6, + CS_RESET = 7, + CS_DESTROY = 8, +}; + +static const cstr statenames[] = { + "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY" +}; + +struct OutgoingPacket { + size_t length; + size_t payload; + uint64 time_sent; // microseconds + uint transmissions:31; + bool need_resend:1; + byte data[1]; +}; + +void no_read(void *socket, const byte *bytes, size_t count) {} +void no_write(void *socket, byte *bytes, size_t count) {} +size_t no_rb_size(void *socket) { return 0; } +void no_state(void *socket, int state) {} +void no_error(void *socket, int errcode) {} +void no_overhead(void *socket, bool send, size_t count, int type) {} + +UTPFunctionTable zero_funcs = { + &no_read, + &no_write, + &no_rb_size, + &no_state, + &no_error, + &no_overhead, +}; + +struct SizableCircularBuffer { + // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size. + size_t mask; + // This is the elements that the circular buffer points to + void **elements; + + void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; } + void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; } + + void grow(size_t item, size_t index); + void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); } + size_t size() { return mask + 1; } +}; + +static struct UTPGlobalStats _global_stats; + +// Item contains the element we want to make space for +// index is the index in the list. +void SizableCircularBuffer::grow(size_t item, size_t index) +{ + // Figure out the new size. + size_t size = mask + 1; + do size *= 2; while (index >= size); + + // Allocate the new buffer + void **buf = (void**)calloc(size, sizeof(void*)); + + size--; + + // Copy elements from the old buffer to the new buffer + for (size_t i = 0; i <= mask; i++) { + buf[(item - index + i) & size] = get(item - index + i); + } + + // Swap to the newly allocated buffer + mask = size; + free(elements); + elements = buf; +} + +// compare if lhs is less than rhs, taking wrapping +// into account. if lhs is close to UINT_MAX and rhs +// is close to 0, lhs is assumed to have wrapped and +// considered smaller +bool wrapping_compare_less(uint32 lhs, uint32 rhs) +{ + // distance walking from lhs to rhs, downwards + const uint32 dist_down = lhs - rhs; + // distance walking from lhs to rhs, upwards + const uint32 dist_up = rhs - lhs; + + // if the distance walking up is shorter, lhs + // is less than rhs. If the distance walking down + // is shorter, then rhs is less than lhs + return dist_up < dist_down; +} + +struct DelayHist { + uint32 delay_base; + + // this is the history of delay samples, + // normalized by using the delay_base. These + // values are always greater than 0 and measures + // the queuing delay in microseconds + uint32 cur_delay_hist[CUR_DELAY_SIZE]; + size_t cur_delay_idx; + + // this is the history of delay_base. It's + // a number that doesn't have an absolute meaning + // only relative. It doesn't make sense to initialize + // it to anything other than values relative to + // what's been seen in the real world. + uint32 delay_base_hist[DELAY_BASE_HISTORY]; + size_t delay_base_idx; + // the time when we last stepped the delay_base_idx + uint32 delay_base_time; + + bool delay_base_initialized; + + void clear() + { + delay_base_initialized = false; + delay_base = 0; + cur_delay_idx = 0; + delay_base_idx = 0; + delay_base_time = g_current_ms; + for (size_t i = 0; i < CUR_DELAY_SIZE; i++) { + cur_delay_hist[i] = 0; + } + for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { + delay_base_hist[i] = 0; + } + } + + void shift(const uint32 offset) + { + // the offset should never be "negative" + assert(offset < 0x10000000); + + // increase all of our base delays by this amount + // this is used to take clock skew into account + // by observing the other side's changes in its base_delay + for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { + delay_base_hist[i] += offset; + } + delay_base += offset; + } + + void add_sample(const uint32 sample) + { + // The two clocks (in the two peers) are assumed not to + // progress at the exact same rate. They are assumed to be + // drifting, which causes the delay samples to contain + // a systematic error, either they are under- + // estimated or over-estimated. This is why we update the + // delay_base every two minutes, to adjust for this. + + // This means the values will keep drifting and eventually wrap. + // We can cross the wrapping boundry in two directions, either + // going up, crossing the highest value, or going down, crossing 0. + + // if the delay_base is close to the max value and sample actually + // wrapped on the other end we would see something like this: + // delay_base = 0xffffff00, sample = 0x00000400 + // sample - delay_base = 0x500 which is the correct difference + + // if the delay_base is instead close to 0, and we got an even lower + // sample (that will eventually update the delay_base), we may see + // something like this: + // delay_base = 0x00000400, sample = 0xffffff00 + // sample - delay_base = 0xfffffb00 + // this needs to be interpreted as a negative number and the actual + // recorded delay should be 0. + + // It is important that all arithmetic that assume wrapping + // is done with unsigned intergers. Signed integers are not guaranteed + // to wrap the way unsigned integers do. At least GCC takes advantage + // of this relaxed rule and won't necessarily wrap signed ints. + + // remove the clock offset and propagation delay. + // delay base is min of the sample and the current + // delay base. This min-operation is subject to wrapping + // and care needs to be taken to correctly choose the + // true minimum. + + // specifically the problem case is when delay_base is very small + // and sample is very large (because it wrapped past zero), sample + // needs to be considered the smaller + + if (!delay_base_initialized) { + // delay_base being 0 suggests that we haven't initialized + // it or its history with any real measurements yet. Initialize + // everything with this sample. + for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { + // if we don't have a value, set it to the current sample + delay_base_hist[i] = sample; + continue; + } + delay_base = sample; + delay_base_initialized = true; + } + + if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx])) { + // sample is smaller than the current delay_base_hist entry + // update it + delay_base_hist[delay_base_idx] = sample; + } + + // is sample lower than delay_base? If so, update delay_base + if (wrapping_compare_less(sample, delay_base)) { + // sample is smaller than the current delay_base + // update it + delay_base = sample; + } + + // this operation may wrap, and is supposed to + const uint32 delay = sample - delay_base; + // sanity check. If this is triggered, something fishy is going on + // it means the measured sample was greater than 32 seconds! +// assert(delay < 0x2000000); + + cur_delay_hist[cur_delay_idx] = delay; + cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE; + + // once every minute + if (g_current_ms - delay_base_time > 60 * 1000) { + delay_base_time = g_current_ms; + delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY; + // clear up the new delay base history spot by initializing + // it to the current sample, then update it + delay_base_hist[delay_base_idx] = sample; + delay_base = delay_base_hist[0]; + // Assign the lowest delay in the last 2 minutes to delay_base + for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) { + if (wrapping_compare_less(delay_base_hist[i], delay_base)) + delay_base = delay_base_hist[i]; + } + } + } + + uint32 get_value() + { + uint32 value = UINT_MAX; + for (size_t i = 0; i < CUR_DELAY_SIZE; i++) { + value = min(cur_delay_hist[i], value); + } + // value could be UINT_MAX if we have no samples yet... + return value; + } +}; + +struct UTPSocket { + PackedSockAddr addr; + + size_t idx; + + uint16 reorder_count; + byte duplicate_ack; + + // the number of bytes we've received but not acked yet + size_t bytes_since_ack; + + // the number of packets in the send queue. Packets that haven't + // yet been sent count as well as packets marked as needing resend + // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets + uint16 cur_window_packets; + + // how much of the window is used, number of bytes in-flight + // packets that have not yet been sent do not count, packets + // that are marked as needing to be re-sent (due to a timeout) + // don't count either + size_t cur_window; + // maximum window size, in bytes + size_t max_window; + // SO_SNDBUF setting, in bytes + size_t opt_sndbuf; + // SO_RCVBUF setting, in bytes + size_t opt_rcvbuf; + + // Is a FIN packet in the reassembly buffer? + bool got_fin:1; + // Timeout procedure + bool fast_timeout:1; + + // max receive window for other end, in bytes + size_t max_window_user; + // 0 = original uTP header, 1 = second revision + byte version; + CONN_STATE state; + // TickCount when we last decayed window (wraps) + int32 last_rwin_decay; + + // the sequence number of the FIN packet. This field is only set + // when we have received a FIN, and the flag field has the FIN flag set. + // it is used to know when it is safe to destroy the socket, we must have + // received all packets up to this sequence number first. + uint16 eof_pkt; + + // All sequence numbers up to including this have been properly received + // by us + uint16 ack_nr; + // This is the sequence number for the next packet to be sent. + uint16 seq_nr; + + uint16 timeout_seq_nr; + + // This is the sequence number of the next packet we're allowed to + // do a fast resend with. This makes sure we only do a fast-resend + // once per packet. We can resend the packet with this sequence number + // or any later packet (with a higher sequence number). + uint16 fast_resend_seq_nr; + + uint32 reply_micro; + + // the time when we need to send another ack. If there's + // nothing to ack, this is a very large number + uint32 ack_time; + + uint32 last_got_packet; + uint32 last_sent_packet; + uint32 last_measured_delay; + uint32 last_maxed_out_window; + + // the last time we added send quota to the connection + // when adding send quota, this is subtracted from the + // current time multiplied by max_window / rtt + // which is the current allowed send rate. + int32 last_send_quota; + + // the number of bytes we are allowed to send on + // this connection. If this is more than one packet + // size when we run out of data to send, it is clamped + // to the packet size + // this value is multiplied by 100 in order to get + // higher accuracy when dealing with low rates + int32 send_quota; + + SendToProc *send_to_proc; + void *send_to_userdata; + UTPFunctionTable func; + void *userdata; + + // Round trip time + uint rtt; + // Round trip time variance + uint rtt_var; + // Round trip timeout + uint rto; + DelayHist rtt_hist; + uint retransmit_timeout; + // The RTO timer will timeout here. + uint rto_timeout; + // When the window size is set to zero, start this timer. It will send a new packet every 30secs. + uint32 zerowindow_time; + + uint32 conn_seed; + // Connection ID for packets I receive + uint32 conn_id_recv; + // Connection ID for packets I send + uint32 conn_id_send; + // Last rcv window we advertised, in bytes + size_t last_rcv_win; + + DelayHist our_hist; + DelayHist their_hist; + + // extension bytes from SYN packet + byte extensions[8]; + + SizableCircularBuffer inbuf, outbuf; + +#ifdef _DEBUG + // Public stats, returned by UTP_GetStats(). See utp.h + UTPStats _stats; +#endif // _DEBUG + + // Calculates the current receive window + size_t get_rcv_window() const + { + // If we don't have a connection (such as during connection + // establishment, always act as if we have an empty buffer). + if (!userdata) return opt_rcvbuf; + + // Trim window down according to what's already in buffer. + const size_t numbuf = func.get_rb_size(userdata); + assert((int)numbuf >= 0); + return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0; + } + + // Test if we're ready to decay max_window + // XXX this breaks when spaced by > INT_MAX/2, which is 49 + // days; the failure mode in that case is we do an extra decay + // or fail to do one when we really shouldn't. + bool can_decay_win(int32 msec) const + { + return msec - last_rwin_decay >= MAX_WINDOW_DECAY; + } + + // If we can, decay max window, returns true if we actually did so + void maybe_decay_win() + { + if (can_decay_win(g_current_ms)) { + // TCP uses 0.5 + max_window = (size_t)(max_window * .5); + last_rwin_decay = g_current_ms; + if (max_window < MIN_WINDOW_SIZE) + max_window = MIN_WINDOW_SIZE; + } + } + + size_t get_header_size() const + { + return (version ? sizeof(PacketFormatV1) : sizeof(PacketFormat)); + } + + size_t get_header_extensions_size() const + { + return (version ? sizeof(PacketFormatExtensionsV1) : sizeof(PacketFormatExtensions)); + } + + void sent_ack() + { + ack_time = g_current_ms + 0x70000000; + bytes_since_ack = 0; + } + + size_t get_udp_mtu() const + { + socklen_t len; + SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); + return UTP_GetUDPMTU((const struct sockaddr *)&sa, len); + } + + size_t get_udp_overhead() const + { + socklen_t len; + SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); + return UTP_GetUDPOverhead((const struct sockaddr *)&sa, len); + } + + uint64 get_global_utp_bytes_sent() const + { + socklen_t len; + SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len); + return UTP_GetGlobalUTPBytesSent((const struct sockaddr *)&sa, len); + } + + size_t get_overhead() const + { + return get_udp_overhead() + get_header_size(); + } + + void send_data(PacketFormat* b, size_t length, bandwidth_type_t type); + + void send_ack(bool synack = false); + + void send_keep_alive(); + + static void send_rst(SendToProc *send_to_proc, void *send_to_userdata, + const PackedSockAddr &addr, uint32 conn_id_send, + uint16 ack_nr, uint16 seq_nr, byte version); + + void send_packet(OutgoingPacket *pkt); + + bool is_writable(size_t to_write); + + bool flush_packets(); + + void write_outgoing_packet(size_t payload, uint flags); + + void update_send_quota(); + +#ifdef _DEBUG + void check_invariant(); +#endif + + void check_timeouts(); + + int ack_packet(uint16 seq); + + size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt); + + void selective_ack(uint base, const byte *mask, byte len); + + void apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt); + + size_t get_packet_size(); +}; + +Array g_rst_info; +Array g_utp_sockets; + +static void UTP_RegisterSentPacket(size_t length) { + if (length <= PACKET_SIZE_MID) { + if (length <= PACKET_SIZE_EMPTY) { + _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++; + } else if (length <= PACKET_SIZE_SMALL) { + _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++; + } else + _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++; + } else { + if (length <= PACKET_SIZE_BIG) { + _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++; + } else + _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++; + } +} + +void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, const byte *p, size_t len, const PackedSockAddr &addr) +{ + socklen_t tolen; + SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen); + UTP_RegisterSentPacket(len); + send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen); +} + +void UTPSocket::send_data(PacketFormat* b, size_t length, bandwidth_type_t type) +{ + // time stamp this packet with local time, the stamp goes into + // the header of every packet at the 8th byte for 8 bytes : + // two integers, check packet.h for more + uint64 time = UTP_GetMicroseconds(); + + PacketFormatV1* b1 = (PacketFormatV1*)b; + if (version == 0) { + b->tv_sec = (uint32)(time / 1000000); + b->tv_usec = time % 1000000; + b->reply_micro = reply_micro; + } else { + b1->tv_usec = (uint32)time; + b1->reply_micro = reply_micro; + } + + last_sent_packet = g_current_ms; + +#ifdef _DEBUG + _stats._nbytes_xmit += length; + ++_stats._nxmit; +#endif + if (userdata) { + size_t n; + if (type == payload_bandwidth) { + // if this packet carries payload, just + // count the header as overhead + type = header_overhead; + n = get_overhead(); + } else { + n = length + get_udp_overhead(); + } + func.on_overhead(userdata, true, n, type); + } +#if g_log_utp_verbose + int flags = version == 0 ? b->flags : b1->type; + uint16 seq_nr = version == 0 ? b->seq_nr : b1->seq_nr; + uint16 ack_nr = version == 0 ? b->ack_nr : b1->ack_nr; + LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u", + this, addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags], + seq_nr, ack_nr); +#endif + send_to_addr(send_to_proc, send_to_userdata, (const byte*)b, length, addr); +} + +void UTPSocket::send_ack(bool synack) +{ + PacketFormatExtensions pfe; + zeromem(&pfe); + PacketFormatExtensionsV1& pfe1 = (PacketFormatExtensionsV1&)pfe; + PacketFormatAck& pfa = (PacketFormatAck&)pfe1; + PacketFormatAckV1& pfa1 = (PacketFormatAckV1&)pfe1; + + size_t len; + last_rcv_win = get_rcv_window(); + if (version == 0) { + pfa.pf.connid = conn_id_send; + pfa.pf.ack_nr = (uint16)ack_nr; + pfa.pf.seq_nr = (uint16)seq_nr; + pfa.pf.flags = ST_STATE; + pfa.pf.ext = 0; + pfa.pf.windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE); + len = sizeof(PacketFormat); + } else { + pfa1.pf.version = 1; + pfa1.pf.type = ST_STATE; + pfa1.pf.ext = 0; + pfa1.pf.connid = conn_id_send; + pfa1.pf.ack_nr = ack_nr; + pfa1.pf.seq_nr = seq_nr; + pfa1.pf.windowsize = (uint32)last_rcv_win; + len = sizeof(PacketFormatV1); + } + + // we never need to send EACK for connections + // that are shutting down + if (reorder_count != 0 && state < CS_GOT_FIN) { + // if reorder count > 0, send an EACK. + // reorder count should always be 0 + // for synacks, so this should not be + // as synack + assert(!synack); + if (version == 0) { + pfa.pf.ext = 1; + pfa.ext_next = 0; + pfa.ext_len = 4; + } else { + pfa1.pf.ext = 1; + pfa1.ext_next = 0; + pfa1.ext_len = 4; + } + uint m = 0; + + // reorder count should only be non-zero + // if the packet ack_nr + 1 has not yet + // been received + assert(inbuf.get(ack_nr + 1) == NULL); + size_t window = min(14+16, inbuf.size()); + // Generate bit mask of segments received. + for (size_t i = 0; i < window; i++) { + if (inbuf.get(ack_nr + i + 2) != NULL) { + m |= 1 << i; + LOG_UTPV("0x%08x: EACK packet [%u]", this, ack_nr + i + 2); + } + } + if (version == 0) { + pfa.acks[0] = (byte)m; + pfa.acks[1] = (byte)(m >> 8); + pfa.acks[2] = (byte)(m >> 16); + pfa.acks[3] = (byte)(m >> 24); + } else { + pfa1.acks[0] = (byte)m; + pfa1.acks[1] = (byte)(m >> 8); + pfa1.acks[2] = (byte)(m >> 16); + pfa1.acks[3] = (byte)(m >> 24); + } + len += 4 + 2; + LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", this, ack_nr, conn_id_send, m); + } else if (synack) { + // we only send "extensions" in response to SYN + // and the reorder count is 0 in that state + + LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", this, ack_nr, conn_id_send); + if (version == 0) { + pfe.pf.ext = 2; + pfe.ext_next = 0; + pfe.ext_len = 8; + memset(pfe.extensions, 0, 8); + } else { + pfe1.pf.ext = 2; + pfe1.ext_next = 0; + pfe1.ext_len = 8; + memset(pfe1.extensions, 0, 8); + } + len += 8 + 2; + } else { + LOG_UTPV("0x%08x: Sending ACK %u [%u]", this, ack_nr, conn_id_send); + } + + sent_ack(); + send_data((PacketFormat*)&pfe, len, ack_overhead); +} + +void UTPSocket::send_keep_alive() +{ + ack_nr--; + LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", this, ack_nr, conn_id_send); + send_ack(); + ack_nr++; +} + +void UTPSocket::send_rst(SendToProc *send_to_proc, void *send_to_userdata, + const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version) +{ + PacketFormat pf; + zeromem(&pf); + PacketFormatV1& pf1 = (PacketFormatV1&)pf; + + size_t len; + if (version == 0) { + pf.connid = conn_id_send; + pf.ack_nr = ack_nr; + pf.seq_nr = seq_nr; + pf.flags = ST_RESET; + pf.ext = 0; + pf.windowsize = 0; + len = sizeof(PacketFormat); + } else { + pf1.version = 1; + pf1.type= ST_RESET; + pf1.ext = 0; + pf1.connid = conn_id_send; + pf1.ack_nr = ack_nr; + pf1.seq_nr = seq_nr; + pf1.windowsize = 0; + len = sizeof(PacketFormatV1); + } + + LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr); + LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send); + send_to_addr(send_to_proc, send_to_userdata, (const byte*)&pf1, len, addr); +} + +void UTPSocket::send_packet(OutgoingPacket *pkt) +{ + // only count against the quota the first time we + // send the packet. Don't enforce quota when closing + // a socket. Only enforce the quota when we're sending + // at slow rates (max window < packet size) + size_t max_send = min(max_window, opt_sndbuf, max_window_user); + + if (pkt->transmissions == 0 || pkt->need_resend) { + cur_window += pkt->payload; + } + + size_t packet_size = get_packet_size(); + if (pkt->transmissions == 0 && max_send < packet_size) { + assert(state == CS_FIN_SENT || + (int32)pkt->payload <= send_quota / 100); + send_quota = send_quota - (int32)(pkt->payload * 100); + } + + pkt->need_resend = false; + + PacketFormatV1* p1 = (PacketFormatV1*)pkt->data; + PacketFormat* p = (PacketFormat*)pkt->data; + if (version == 0) { + p->ack_nr = ack_nr; + } else { + p1->ack_nr = ack_nr; + } + pkt->time_sent = UTP_GetMicroseconds(); + pkt->transmissions++; + sent_ack(); + send_data((PacketFormat*)pkt->data, pkt->length, + (state == CS_SYN_SENT) ? connect_overhead + : (pkt->transmissions == 1) ? payload_bandwidth + : retransmit_overhead); +} + +bool UTPSocket::is_writable(size_t to_write) +{ + // return true if it's OK to stuff another packet into the + // outgoing queue. Since we may be using packet pacing, we + // might not actually send the packet right away to affect the + // cur_window. The only thing that happens when we add another + // packet is that cur_window_packets is increased. + size_t max_send = min(max_window, opt_sndbuf, max_window_user); + + size_t packet_size = get_packet_size(); + + if (cur_window + packet_size >= max_window) + last_maxed_out_window = g_current_ms; + + // if we don't have enough quota, we can't write regardless + if (USE_PACKET_PACING) { + if (send_quota / 100 < (int32)to_write) return false; + } + + // subtract one to save space for the FIN packet + if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) return false; + + // if sending another packet would not make the window exceed + // the max_window, we can write + if (cur_window + packet_size <= max_send) return true; + + // if the window size is less than a packet, and we have enough + // quota to send a packet, we can write, even though it would + // make the window exceed the max size + // the last condition is needed to not put too many packets + // in the send buffer. cur_window isn't updated until we flush + // the send buffer, so we need to take the number of packets + // into account + if (USE_PACKET_PACING) { + if (max_window < to_write && + cur_window < max_window && + cur_window_packets == 0) { + return true; + } + } + + return false; +} + +bool UTPSocket::flush_packets() +{ + size_t packet_size = get_packet_size(); + + // send packets that are waiting on the pacer to be sent + // i has to be an unsigned 16 bit counter to wrap correctly + // signed types are not guaranteed to wrap the way you expect + for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) { + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i); + if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue; + // have we run out of quota? + if (!is_writable(pkt->payload)) { + return true; + } + + // Nagle check + // don't send the last packet if we have one packet in-flight + // and the current packet is still smaller than packet_size. + if (i != ((seq_nr - 1) & ACK_NR_MASK) || + cur_window_packets == 1 || + pkt->payload >= packet_size) { + send_packet(pkt); + + // No need to send another ack if there is nothing to reorder. + if (reorder_count == 0) { + sent_ack(); + } + } + } + return false; +} + +void UTPSocket::write_outgoing_packet(size_t payload, uint flags) +{ + // Setup initial timeout timer + if (cur_window_packets == 0) { + retransmit_timeout = rto; + rto_timeout = g_current_ms + retransmit_timeout; + assert(cur_window == 0); + } + + size_t packet_size = get_packet_size(); + do { + assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE); + assert(flags == ST_DATA || flags == ST_FIN); + + size_t added = 0; + + OutgoingPacket *pkt = NULL; + + if (cur_window_packets > 0) { + pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1); + } + + const size_t header_size = get_header_size(); + bool append = true; + + // if there's any room left in the last packet in the window + // and it hasn't been sent yet, fill that frame first + if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) { + // Use the previous unsent packet + added = min(payload + pkt->payload, max(packet_size, pkt->payload)) - pkt->payload; + pkt = (OutgoingPacket*)realloc(pkt, + (sizeof(OutgoingPacket) - 1) + + header_size + + pkt->payload + added); + outbuf.put(seq_nr - 1, pkt); + append = false; + assert(!pkt->need_resend); + } else { + // Create the packet to send. + added = payload; + pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) + + header_size + + added); + pkt->payload = 0; + pkt->transmissions = 0; + pkt->need_resend = false; + } + + if (added) { + // Fill it with data from the upper layer. + func.on_write(userdata, pkt->data + header_size + pkt->payload, added); + } + pkt->payload += added; + pkt->length = header_size + pkt->payload; + + last_rcv_win = get_rcv_window(); + + PacketFormat* p = (PacketFormat*)pkt->data; + PacketFormatV1* p1 = (PacketFormatV1*)pkt->data; + if (version == 0) { + p->connid = conn_id_send; + p->ext = 0; + p->windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE); + p->ack_nr = ack_nr; + p->flags = flags; + } else { + p1->version = 1; + p1->type = flags; + p1->ext = 0; + p1->connid = conn_id_send; + p1->windowsize = (uint32)last_rcv_win; + p1->ack_nr = ack_nr; + } + + if (append) { + // Remember the message in the outgoing queue. + outbuf.ensure_size(seq_nr, cur_window_packets); + outbuf.put(seq_nr, pkt); + if (version == 0) p->seq_nr = seq_nr; + else p1->seq_nr = seq_nr; + seq_nr++; + cur_window_packets++; + } + + payload -= added; + + } while (payload); + + flush_packets(); +} + +void UTPSocket::update_send_quota() +{ + int dt = g_current_ms - last_send_quota; + if (dt == 0) return; + last_send_quota = g_current_ms; + size_t add = max_window * dt * 100 / (rtt_hist.delay_base?rtt_hist.delay_base:50); + if (add > max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100) add = max_window; + send_quota += (int32)add; +// LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d", +// this, dt, rtt, (uint)max_window, send_quota / 100); +} + +#ifdef _DEBUG +void UTPSocket::check_invariant() +{ + if (reorder_count > 0) { + assert(inbuf.get(ack_nr + 1) == NULL); + } + + size_t outstanding_bytes = 0; + for (int i = 0; i < cur_window_packets; ++i) { + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1); + if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue; + outstanding_bytes += pkt->payload; + } + assert(outstanding_bytes == cur_window); +} +#endif + +void UTPSocket::check_timeouts() +{ +#ifdef _DEBUG + check_invariant(); +#endif + + // this invariant should always be true + assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets)); + + LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d " + "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d", + this, (int)(rto_timeout - g_current_ms), (uint)max_window, (uint)cur_window, + send_quota / 100, statenames[state], cur_window_packets, + (uint)bytes_since_ack, (int)(g_current_ms - ack_time)); + + update_send_quota(); + flush_packets(); + + + if (USE_PACKET_PACING) { + // In case the new send quota made it possible to send another packet + // Mark the socket as writable. If we don't use pacing, the send + // quota does not affect if the socket is writeable + // if we don't use packet pacing, the writable event is triggered + // whenever the cur_window falls below the max_window, so we don't + // need this check then + if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) { + state = CS_CONNECTED; + LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", + this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size()); + func.on_state(userdata, UTP_STATE_WRITABLE); + } + } + + switch (state) { + case CS_SYN_SENT: + case CS_CONNECTED_FULL: + case CS_CONNECTED: + case CS_FIN_SENT: { + + // Reset max window... + if ((int)(g_current_ms - zerowindow_time) >= 0 && max_window_user == 0) { + max_window_user = PACKET_SIZE; + } + + if ((int)(g_current_ms - rto_timeout) >= 0 && + (!(USE_PACKET_PACING) || cur_window_packets > 0) && + rto_timeout > 0) { + + /* + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets); + + // If there were a lot of retransmissions, force recomputation of round trip time + if (pkt->transmissions >= 4) + rtt = 0; + */ + + // Increase RTO + const uint new_timeout = retransmit_timeout * 2; + if (new_timeout >= 30000 || (state == CS_SYN_SENT && new_timeout > 6000)) { + // more than 30 seconds with no reply. kill it. + // if we haven't even connected yet, give up sooner. 6 seconds + // means 2 tries at the following timeouts: 3, 6 seconds + if (state == CS_FIN_SENT) + state = CS_DESTROY; + else + state = CS_RESET; + func.on_error(userdata, ETIMEDOUT); + goto getout; + } + + retransmit_timeout = new_timeout; + rto_timeout = g_current_ms + new_timeout; + + // On Timeout + duplicate_ack = 0; + + // rate = min_rate + max_window = get_packet_size(); + send_quota = max((int32)max_window * 100, send_quota); + + // every packet should be considered lost + for (int i = 0; i < cur_window_packets; ++i) { + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1); + if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue; + pkt->need_resend = true; + assert(cur_window >= pkt->payload); + cur_window -= pkt->payload; + } + + // used in parse_log.py + LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u", + this, seq_nr - cur_window_packets, retransmit_timeout, (uint)max_window); + + fast_timeout = true; + timeout_seq_nr = seq_nr; + + if (cur_window_packets > 0) { + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets); + assert(pkt); + send_quota = max((int32)pkt->length * 100, send_quota); + + // Re-send the packet. + send_packet(pkt); + } + } + + // Mark the socket as writable + if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) { + state = CS_CONNECTED; + LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", + this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size()); + func.on_state(userdata, UTP_STATE_WRITABLE); + } + + if (state >= CS_CONNECTED && state <= CS_FIN_SENT) { + // Send acknowledgment packets periodically, or when the threshold is reached + if (bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || + (int)(g_current_ms - ack_time) >= 0) { + send_ack(); + } + + if ((int)(g_current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) { + send_keep_alive(); + } + } + + break; + } + + // Close? + case CS_GOT_FIN: + case CS_DESTROY_DELAY: + if ((int)(g_current_ms - rto_timeout) >= 0) { + state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET; + if (cur_window_packets > 0 && userdata) { + func.on_error(userdata, ECONNRESET); + } + } + break; + // prevent warning + case CS_IDLE: + case CS_RESET: + case CS_DESTROY: + break; + } + + getout: + + // make sure we don't accumulate quota when we don't have + // anything to send + int32 limit = max((int32)max_window / 2, 5 * (int32)get_packet_size()) * 100; + if (send_quota > limit) send_quota = limit; +} + +// returns: +// 0: the packet was acked. +// 1: it means that the packet had already been acked +// 2: the packet has not been sent yet +int UTPSocket::ack_packet(uint16 seq) +{ + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq); + + // the packet has already been acked (or not sent) + if (pkt == NULL) { + LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", this, seq); + return 1; + } + + // can't ack packets that haven't been sent yet! + if (pkt->transmissions == 0) { + LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)", + this, seq, (uint)pkt->payload, pkt->need_resend); + return 2; + } + + LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)", + this, seq, (uint)pkt->payload, pkt->need_resend); + + outbuf.put(seq, NULL); + + // if we never re-sent the packet, update the RTT estimate + if (pkt->transmissions == 1) { + // Estimate the round trip time. + const uint32 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000); + if (rtt == 0) { + // First round trip time sample + rtt = ertt; + rtt_var = ertt / 2; + // sanity check. rtt should never be more than 6 seconds +// assert(rtt < 6000); + } else { + // Compute new round trip times + const int delta = (int)rtt - ertt; + rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4; + rtt = rtt - rtt/8 + ertt/8; + // sanity check. rtt should never be more than 6 seconds +// assert(rtt < 6000); + rtt_hist.add_sample(ertt); + } + rto = max(rtt + rtt_var * 4, 500); + LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u", + this, ertt, rtt, rtt_var, rto); + } + retransmit_timeout = rto; + rto_timeout = g_current_ms + rto; + // if need_resend is set, this packet has already + // been considered timed-out, and is not included in + // the cur_window anymore + if (!pkt->need_resend) { + assert(cur_window >= pkt->payload); + cur_window -= pkt->payload; + } + free(pkt); + return 0; +} + +// count the number of bytes that were acked by the EACK header +size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt) +{ + if (cur_window_packets == 0) return 0; + + size_t acked_bytes = 0; + int bits = len * 8; + + do { + uint v = base + bits; + + // ignore bits that haven't been sent yet + // see comment in UTPSocket::selective_ack + if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1)) + continue; + + // ignore bits that represents packets we haven't sent yet + // or packets that have already been acked + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); + if (!pkt || pkt->transmissions == 0) + continue; + + // Count the number of segments that were successfully received past it. + if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) { + assert((int)(pkt->payload) >= 0); + acked_bytes += pkt->payload; + min_rtt = min(min_rtt, UTP_GetMicroseconds() - pkt->time_sent); + continue; + } + } while (--bits >= -1); + return acked_bytes; +} + +void UTPSocket::selective_ack(uint base, const byte *mask, byte len) +{ + if (cur_window_packets == 0) return; + + // the range is inclusive [0, 31] bits + int bits = len * 8 - 1; + + int count = 0; + + // resends is a stack of sequence numbers we need to resend. Since we + // iterate in reverse over the acked packets, at the end, the top packets + // are the ones we want to resend + int resends[32]; + int nr = 0; + + LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", this, *(uint32*)mask, base); + do { + // we're iterating over the bits from higher sequence numbers + // to lower (kind of in reverse order, wich might not be very + // intuitive) + uint v = base + bits; + + // ignore bits that haven't been sent yet + // and bits that fall below the ACKed sequence number + // this can happen if an EACK message gets + // reordered and arrives after a packet that ACKs up past + // the base for thie EACK message + + // this is essentially the same as: + // if v >= seq_nr || v <= seq_nr - cur_window_packets + // but it takes wrapping into account + + // if v == seq_nr the -1 will make it wrap. if v > seq_nr + // it will also wrap (since it will fall further below 0) + // and be > cur_window_packets. + // if v == seq_nr - cur_window_packets, the result will be + // seq_nr - (seq_nr - cur_window_packets) - 1 + // == seq_nr - seq_nr + cur_window_packets - 1 + // == cur_window_packets - 1 which will be caught by the + // test. If v < seq_nr - cur_window_packets the result will grow + // fall furhter outside of the cur_window_packets range. + + // sequence number space: + // + // rejected < accepted > rejected + // <============+--------------+============> + // ^ ^ + // | | + // (seq_nr-wnd) seq_nr + + if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1)) + continue; + + // this counts as a duplicate ack, even though we might have + // received an ack for this packet previously (in another EACK + // message for instance) + bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7)); + + // if this packet is acked, it counts towards the duplicate ack counter + if (bit_set) count++; + + // ignore bits that represents packets we haven't sent yet + // or packets that have already been acked + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); + if (!pkt || pkt->transmissions == 0) { + LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s", + this, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)"); + continue; + } + + // Count the number of segments that were successfully received past it. + if (bit_set) { + // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets + assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask)); + ack_packet(v); + continue; + } + + // Resend segments + // if count is less than our re-send limit, we haven't seen enough + // acked packets in front of this one to warrant a re-send. + // if count == 0, we're still going through the tail of zeroes + if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE && + count >= DUPLICATE_ACKS_BEFORE_RESEND && + duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) { + resends[nr++] = v; + LOG_UTPV("0x%08x: no ack for %u", this, v); + } else { + LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", + this, v, count, duplicate_ack, fast_resend_seq_nr); + } + } while (--bits >= -1); + + if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) < 256 && + count >= DUPLICATE_ACKS_BEFORE_RESEND && + duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) { + // if we get enough duplicate acks to start + // resending, the first packet we should resend + // is base-1 + resends[nr++] = base - 1; + } else { + LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", + this, base - 1, count, duplicate_ack, fast_resend_seq_nr); + } + + bool back_off = false; + int i = 0; + while (nr > 0) { + uint v = resends[--nr]; + // don't consider the tail of 0:es to be lost packets + // only unacked packets with acked packets after should + // be considered lost + OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v); + + // this may be an old (re-ordered) packet, and some of the + // packets in here may have been acked already. In which + // case they will not be in the send queue anymore + if (!pkt) continue; + + // used in parse_log.py + LOG_UTP("0x%08x: Packet %u lost. Resending", this, v); + + // On Loss + back_off = true; +#ifdef _DEBUG + ++_stats._rexmit; +#endif + send_packet(pkt); + fast_resend_seq_nr = v + 1; + + // Re-send max 4 packets. + if (++i >= 4) break; + } + + if (back_off) + maybe_decay_win(); + + duplicate_ack = count; +} + +void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt) +{ + // the delay can never be greater than the rtt. The min_rtt + // variable is the RTT in microseconds + + assert(min_rtt >= 0); + int32 our_delay = min(our_hist.get_value(), uint32(min_rtt)); + assert(our_delay != INT_MAX); + assert(our_delay >= 0); + assert(our_hist.get_value() >= 0); + + SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(); + UTP_DelaySample((sockaddr*)&sa, our_delay / 1000); + + // This test the connection under heavy load from foreground + // traffic. Pretend that our delays are very high to force the + // connection to use sub-packet size window sizes + //our_delay *= 4; + + // target is microseconds + int target = CCONTROL_TARGET; + if (target <= 0) target = 100000; + + double off_target = target - our_delay; + + // this is the same as: + // + // (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT + // + // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction + // of the target delay the current delay represents. + // The min() around off_target protects against crazy values of our_delay, which may happen when th + // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase + // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt. + // as for large negative numbers, this direction is already capped at the min packet size further down + // the min around the bytes_acked protects against the case where the window size was recently + // shrunk and the number of acked bytes exceeds that. This is considered no more than one full + // window, in order to keep the gain within sane boundries. + + assert(bytes_acked > 0); + double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked); + double delay_factor = off_target / target; + double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor; + + // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window) + // may increase per RTT, we may not increase the window size more than that proportional + // to the number of bytes that were acked, so that once one window has been acked (one rtt) + // the increase limit is not exceeded + // the +1. is to allow for floating point imprecision + assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked)); + + if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) { + // if it was more than 300 milliseconds since we tried to send a packet + // and stopped because we hit the max window, we're most likely rate + // limited (which prevents us from ever hitting the window size) + // if this is the case, we cannot let the max_window grow indefinitely + scaled_gain = 0; + } + + if (scaled_gain + max_window < MIN_WINDOW_SIZE) { + max_window = MIN_WINDOW_SIZE; + } else { + max_window = (size_t)(max_window + scaled_gain); + } + + // make sure that the congestion window is below max + // make sure that we don't shrink our window too small + max_window = clamp(max_window, MIN_WINDOW_SIZE, opt_sndbuf); + + // used in parse_log.py + LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u " + "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u " + "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" " + "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u", + this, actual_delay, our_delay / 1000, their_hist.get_value() / 1000, + (int)off_target / 1000, (uint)(max_window), our_hist.delay_base, + (our_delay + their_hist.get_value()) / 1000, target / 1000, (uint)bytes_acked, + (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt, + (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)), + send_quota / 100, (uint)max_window_user, rto, (int)(rto_timeout - g_current_ms), + UTP_GetMicroseconds(), cur_window_packets, (uint)get_packet_size(), + their_hist.delay_base, their_hist.delay_base + their_hist.get_value()); +} + +static void UTP_RegisterRecvPacket(UTPSocket *conn, size_t len) +{ +#ifdef _DEBUG + ++conn->_stats._nrecv; + conn->_stats._nbytes_recv += len; +#endif + + if (len <= PACKET_SIZE_MID) { + if (len <= PACKET_SIZE_EMPTY) { + _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++; + } else if (len <= PACKET_SIZE_SMALL) { + _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++; + } else + _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++; + } else { + if (len <= PACKET_SIZE_BIG) { + _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++; + } else + _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++; + } +} + +// returns the max number of bytes of payload the uTP +// connection is allowed to send +size_t UTPSocket::get_packet_size() +{ + int header_size = version == 1 + ? sizeof(PacketFormatV1) + : sizeof(PacketFormat); + + size_t mtu = get_udp_mtu(); + + if (DYNAMIC_PACKET_SIZE_ENABLED) { + SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(); + size_t max_packet_size = UTP_GetPacketSize((sockaddr*)&sa); + return min(mtu - header_size, max_packet_size); + } + else + { + return mtu - header_size; + } +} + +// Process an incoming packet +// syn is true if this is the first packet received. It will cut off parsing +// as soon as the header is done +size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false) +{ + UTP_RegisterRecvPacket(conn, len); + + g_current_ms = UTP_GetMilliseconds(); + + conn->update_send_quota(); + + const PacketFormat *pf = (PacketFormat*)packet; + const PacketFormatV1 *pf1 = (PacketFormatV1*)packet; + const byte *packet_end = packet + len; + + uint16 pk_seq_nr; + uint16 pk_ack_nr; + uint8 pk_flags; + if (conn->version == 0) { + pk_seq_nr = pf->seq_nr; + pk_ack_nr = pf->ack_nr; + pk_flags = pf->flags; + } else { + pk_seq_nr = pf1->seq_nr; + pk_ack_nr = pf1->ack_nr; + pk_flags = pf1->type; + } + + if (pk_flags >= ST_NUM_STATES) return 0; + + LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u", + conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version, + conn->version == 0?(uint64)(pf->tv_sec) * 1000000 + pf->tv_usec:uint64(pf1->tv_usec), + conn->version == 0?(uint32)(pf->reply_micro):(uint32)(pf1->reply_micro)); + + // mark receipt time + uint64 time = UTP_GetMicroseconds(); + + // RSTs are handled earlier, since the connid matches the send id not the recv id + assert(pk_flags != ST_RESET); + + // TODO: maybe send a ST_RESET if we're in CS_RESET? + + const byte *selack_ptr = NULL; + + // Unpack UTP packet options + // Data pointer + const byte *data = (const byte*)pf + conn->get_header_size(); + if (conn->get_header_size() > len) { + LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn); + return 0; + } + // Skip the extension headers + uint extension = conn->version == 0 ? pf->ext : pf1->ext; + if (extension != 0) { + do { + // Verify that the packet is valid. + data += 2; + + if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) { + LOG_UTPV("0x%08x: Invalid len of extensions", conn); + return 0; + } + + switch(extension) { + case 1: // Selective Acknowledgment + selack_ptr = data; + break; + case 2: // extension bits + if (data[-1] != 8) { + LOG_UTPV("0x%08x: Invalid len of extension bits header", conn); + return 0; + } + memcpy(conn->extensions, data, 8); + LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn, + conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3], + conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]); + } + extension = data[-2]; + data += data[-1]; + } while (extension); + } + + if (conn->state == CS_SYN_SENT) { + // if this is a syn-ack, initialize our ack_nr + // to match the sequence number we got from + // the other end + conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK; + } + + g_current_ms = UTP_GetMilliseconds(); + conn->last_got_packet = g_current_ms; + + if (syn) { + return 0; + } + + // seqnr is the number of packets past the expected + // packet this is. ack_nr is the last acked, seq_nr is the + // current. Subtracring 1 makes 0 mean "this is the next + // expected packet". + const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK; + + // Getting an invalid sequence number? + if (seqnr >= REORDER_BUFFER_MAX_SIZE) { + if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) { + conn->ack_time = g_current_ms + min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); + } + LOG_UTPV(" Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr); + return 0; + } + + // Process acknowledgment + // acks is the number of packets that was acked + int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK; + + // this happens when we receive an old ack nr + if (acks > conn->cur_window_packets) acks = 0; + + // if we get the same ack_nr as in the last packet + // increase the duplicate_ack counter, otherwise reset + // it to 0 + if (conn->cur_window_packets > 0) { + if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) && + conn->cur_window_packets > 0) { + //++conn->duplicate_ack; + } else { + conn->duplicate_ack = 0; + } + + // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND + // and fast_resend_seq_nr <= ack_nr + 1 + // resend ack_nr + 1 + } + + // figure out how many bytes were acked + size_t acked_bytes = 0; + + // the minimum rtt of all acks + // this is the upper limit on the delay we get back + // from the other peer. Our delay cannot exceed + // the rtt of the packet. If it does, clamp it. + // this is done in apply_ledbat_ccontrol() + int64 min_rtt = INT64_MAX; + + for (int i = 0; i < acks; ++i) { + int seq = conn->seq_nr - conn->cur_window_packets + i; + OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq); + if (pkt == 0 || pkt->transmissions == 0) continue; + assert((int)(pkt->payload) >= 0); + acked_bytes += pkt->payload; + min_rtt = min(min_rtt, UTP_GetMicroseconds() - pkt->time_sent); + } + + // count bytes acked by EACK + if (selack_ptr != NULL) { + acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK, + selack_ptr, selack_ptr[-1], min_rtt); + } + + LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u", + conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, + seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt); + + uint64 p; + + if (conn->version == 0) { + p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec; + } else { + p = pf1->tv_usec; + } + + conn->last_measured_delay = g_current_ms; + + // get delay in both directions + // record the delay to report back + const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p); + conn->reply_micro = their_delay; + uint32 prev_delay_base = conn->their_hist.delay_base; + if (their_delay != 0) conn->their_hist.add_sample(their_delay); + + // if their new delay base is less than their previous one + // we should shift our delay base in the other direction in order + // to take the clock skew into account + if (prev_delay_base != 0 && + wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) { + // never adjust more than 10 milliseconds + if (prev_delay_base - conn->their_hist.delay_base <= 10000) { + conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base); + } + } + + const uint32 actual_delay = conn->version==0 + ?(pf->reply_micro==INT_MAX?0:uint32(pf->reply_micro)) + :(uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro)); + + assert(conn->our_hist.get_value() >= 0); + // if the actual delay is 0, it means the other end + // hasn't received a sample from us yet, and doesn't + // know what it is. We can't update out history unless + // we have a true measured sample + prev_delay_base = conn->our_hist.delay_base; + if (actual_delay != 0) conn->our_hist.add_sample(actual_delay); + assert(conn->our_hist.get_value() >= 0); + + // if our new delay base is less than our previous one + // we should shift the other end's delay base in the other + // direction in order to take the clock skew into account + // This is commented out because it creates bad interactions + // with our adjustment in the other direction. We don't really + // need our estimates of the other peer to be very accurate + // anyway. The problem with shifting here is that we're more + // likely shift it back later because of a low latency. This + // second shift back would cause us to shift our delay base + // which then get's into a death spiral of shifting delay bases +/* if (prev_delay_base != 0 && + wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) { + // never adjust more than 10 milliseconds + if (prev_delay_base - conn->our_hist.delay_base <= 10000) { + conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base); + } + } +*/ + // only apply the congestion controller on acks + // if we don't have a delay measurement, there's + // no point in invoking the congestion control + if (actual_delay != 0 && acked_bytes >= 1) + conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt); + + // sanity check, the other end should never ack packets + // past the point we've sent + if (acks <= conn->cur_window_packets) { + conn->max_window_user = conn->version == 0 + ? pf->windowsize * PACKET_SIZE : pf1->windowsize; + + // If max user window is set to 0, then we startup a timer + // That will reset it to 1 after 15 seconds. + if (conn->max_window_user == 0) + // Reset max_window_user to 1 every 15 seconds. + conn->zerowindow_time = g_current_ms + 15000; + + // Respond to connect message + // Switch to CONNECTED state. + if (conn->state == CS_SYN_SENT) { + conn->state = CS_CONNECTED; + conn->func.on_state(conn->userdata, UTP_STATE_CONNECT); + + // We've sent a fin, and everything was ACKed (including the FIN), + // it's safe to destroy the socket. cur_window_packets == acks + // means that this packet acked all the remaining packets that + // were in-flight. + } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) { + conn->state = CS_DESTROY; + } + + // Update fast resend counter + if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK)) + conn->fast_resend_seq_nr = pk_ack_nr + 1; + + LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr); + + for (int i = 0; i < acks; ++i) { + int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets); + // if ack_status is 0, the packet was acked. + // if acl_stauts is 1, it means that the packet had already been acked + // if it's 2, the packet has not been sent yet + // We need to break this loop in the latter case. This could potentially + // happen if we get an ack_nr that does not exceed what we have stuffed + // into the outgoing buffer, but does exceed what we have sent + if (ack_status == 2) { +#ifdef _DEBUG + OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets); + assert(pkt->transmissions == 0); +#endif + break; + } + conn->cur_window_packets--; + } +#ifdef _DEBUG + if (conn->cur_window_packets == 0) assert(conn->cur_window == 0); +#endif + + // packets in front of this may have been acked by a + // selective ack (EACK). Keep decreasing the window packet size + // until we hit a packet that is still waiting to be acked + // in the send queue + // this is especially likely to happen when the other end + // has the EACK send bug older versions of uTP had + while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)) + conn->cur_window_packets--; + +#ifdef _DEBUG + if (conn->cur_window_packets == 0) assert(conn->cur_window == 0); +#endif + + // this invariant should always be true + assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)); + + // flush Nagle + if (conn->cur_window_packets == 1) { + OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1); + // do we still have quota? + if (pkt->transmissions == 0 && + (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)pkt->length)) { + conn->send_packet(pkt); + + // No need to send another ack if there is nothing to reorder. + if (conn->reorder_count == 0) { + conn->sent_ack(); + } + } + } + + // Fast timeout-retry + if (conn->fast_timeout) { + LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr); + if (((conn->fast_resend_seq_nr - conn->timeout_seq_nr) & ACK_NR_MASK) >= 0 || + ((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) { + conn->fast_timeout = false; + } else { + OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets); + if (pkt && pkt->transmissions > 0) { + LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets); +#ifdef _DEBUG + ++conn->_stats._fastrexmit; +#endif + conn->fast_resend_seq_nr++; + conn->send_packet(pkt); + } + } + } + } + + // Process selective acknowledgent + if (selack_ptr != NULL) { + conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]); + } + + // this invariant should always be true + assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)); + + LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d", + conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, + conn->send_quota / 100); + + // In case the ack dropped the current window below + // the max_window size, Mark the socket as writable + if (conn->state == CS_CONNECTED_FULL && conn->is_writable(conn->get_packet_size())) { + conn->state = CS_CONNECTED; + LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", + conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)conn->get_packet_size()); + conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE); + } + + if (pk_flags == ST_STATE) { + // This is a state packet only. + return 0; + } + + // The connection is not in a state that can accept data? + if (conn->state != CS_CONNECTED && + conn->state != CS_CONNECTED_FULL && + conn->state != CS_FIN_SENT) { + return 0; + } + + // Is this a finalize packet? + if (pk_flags == ST_FIN && !conn->got_fin) { + LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr); + conn->got_fin = true; + conn->eof_pkt = pk_seq_nr; + // at this point, it is possible for the + // other end to have sent packets with + // sequence numbers higher than seq_nr. + // if this is the case, our reorder_count + // is out of sync. This case is dealt with + // when we re-order and hit the eof_pkt. + // we'll just ignore any packets with + // sequence numbers past this + } + + // Getting an in-order packet? + if (seqnr == 0) { + size_t count = packet_end - data; + if (count > 0 && conn->state != CS_FIN_SENT) { + LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata)); + // Post bytes to the upper layer + conn->func.on_read(conn->userdata, data, count); + } + conn->ack_nr++; + conn->bytes_since_ack += count; + + // Check if the next packet has been received too, but waiting + // in the reorder buffer. + for (;;) { + + if (conn->got_fin && conn->eof_pkt == conn->ack_nr) { + if (conn->state != CS_FIN_SENT) { + conn->state = CS_GOT_FIN; + conn->rto_timeout = g_current_ms + min(conn->rto * 3, 60); + + LOG_UTPV("0x%08x: Posting EOF", conn); + conn->func.on_state(conn->userdata, UTP_STATE_EOF); + } + + // if the other end wants to close, ack immediately + conn->send_ack(); + + // reorder_count is not necessarily 0 at this point. + // even though it is most of the time, the other end + // may have sent packets with higher sequence numbers + // than what later end up being eof_pkt + // since we have received all packets up to eof_pkt + // just ignore the ones after it. + conn->reorder_count = 0; + } + + // Quick get-out in case there is nothing to reorder + if (conn->reorder_count == 0) + break; + + // Check if there are additional buffers in the reorder buffers + // that need delivery. + byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1); + if (p == NULL) + break; + conn->inbuf.put(conn->ack_nr+1, NULL); + count = *(uint*)p; + if (count > 0 && conn->state != CS_FIN_SENT) { + // Pass the bytes to the upper layer + conn->func.on_read(conn->userdata, p + sizeof(uint), count); + } + conn->ack_nr++; + conn->bytes_since_ack += count; + + // Free the element from the reorder buffer + free(p); + assert(conn->reorder_count > 0); + conn->reorder_count--; + } + + // start the delayed ACK timer + conn->ack_time = g_current_ms + min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); + } else { + // Getting an out of order packet. + // The packet needs to be remembered and rearranged later. + + // if we have received a FIN packet, and the EOF-sequence number + // is lower than the sequence number of the packet we just received + // something is wrong. + if (conn->got_fin && pk_seq_nr > conn->eof_pkt) { + LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF " + "reorder_count:%u len:%u (rb:%u)", + conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); + return 0; + } + + // if the sequence number is entirely off the expected + // one, just drop it. We can't allocate buffer space in + // the inbuf entirely based on untrusted input + if (seqnr > 0x3ff) { + LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off " + "reorder_count:%u len:%u (rb:%u)", + conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); + return 0; + } + + // we need to grow the circle buffer before we + // check if the packet is already in here, so that + // we don't end up looking at an older packet (since + // the indices wraps around). + conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1); + + // Has this packet already been received? (i.e. a duplicate) + // If that is the case, just discard it. + if (conn->inbuf.get(pk_seq_nr) != NULL) { +#ifdef _DEBUG + ++conn->_stats._nduprecv; +#endif + return 0; + } + + // Allocate memory to fit the packet that needs to re-ordered + byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint)); + *(uint*)mem = (uint)(packet_end - data); + memcpy(mem + sizeof(uint), data, packet_end - data); + + // Insert into reorder buffer and increment the count + // of # of packets to be reordered. + // we add one to seqnr in order to leave the last + // entry empty, that way the assert in send_ack + // is valid. we have to add one to seqnr too, in order + // to make the circular buffer grow around the correct + // point (which is conn->ack_nr + 1). + assert(conn->inbuf.get(pk_seq_nr) == NULL); + assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask)); + conn->inbuf.put(pk_seq_nr, mem); + conn->reorder_count++; + + LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)", + conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata)); + + // Setup so the partial ACK message will get sent immediately. + conn->ack_time = g_current_ms + min(conn->ack_time - g_current_ms, 1); + } + + // If ack_time or ack_bytes indicate that we need to send and ack, send one + // here instead of waiting for the timer to trigger + LOG_UTPV("bytes_since_ack:%u ack_time:%d", + (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time)); + if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL) { + if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || + (int)(g_current_ms - conn->ack_time) >= 0) { + conn->send_ack(); + } + } + return (size_t)(packet_end - data); +} + +inline bool UTP_IsV1(PacketFormatV1 const* pf) +{ + return pf->version == 1 && pf->type < ST_NUM_STATES && pf->ext < 3; +} + +void UTP_Free(UTPSocket *conn) +{ + LOG_UTPV("0x%08x: Killing socket", conn); + + conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING); + UTP_SetCallbacks(conn, NULL, NULL); + + assert(conn->idx < g_utp_sockets.GetCount()); + assert(g_utp_sockets[conn->idx] == conn); + + // Unlink object from the global list + assert(g_utp_sockets.GetCount() > 0); + + UTPSocket *last = g_utp_sockets[g_utp_sockets.GetCount() - 1]; + + assert(last->idx < g_utp_sockets.GetCount()); + assert(g_utp_sockets[last->idx] == last); + + last->idx = conn->idx; + + g_utp_sockets[conn->idx] = last; + + // Decrease the count + g_utp_sockets.SetCount(g_utp_sockets.GetCount() - 1); + + // Free all memory occupied by the socket object. + for (size_t i = 0; i <= conn->inbuf.mask; i++) { + free(conn->inbuf.elements[i]); + } + for (size_t i = 0; i <= conn->outbuf.mask; i++) { + free(conn->outbuf.elements[i]); + } + free(conn->inbuf.elements); + free(conn->outbuf.elements); + + // Finally free the socket object + free(conn); +} + + +// Public functions: +/////////////////////////////////////////////////////////////////////////////// + +// Create a UTP socket +UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen) +{ + UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket)); + + g_current_ms = UTP_GetMilliseconds(); + + UTP_SetCallbacks(conn, NULL, NULL); + conn->our_hist.clear(); + conn->their_hist.clear(); + conn->rto = 3000; + conn->rtt_var = 800; + conn->seq_nr = 1; + conn->ack_nr = 0; + conn->max_window_user = 255 * PACKET_SIZE; + conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen); + conn->send_to_proc = send_to_proc; + conn->send_to_userdata = send_to_userdata; + conn->ack_time = g_current_ms + 0x70000000; + conn->last_got_packet = g_current_ms; + conn->last_sent_packet = g_current_ms; + conn->last_measured_delay = g_current_ms + 0x70000000; + conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY; + conn->last_send_quota = g_current_ms; + conn->send_quota = PACKET_SIZE * 100; + conn->cur_window_packets = 0; + conn->fast_resend_seq_nr = conn->seq_nr; + + // default to version 1 + UTP_SetSockopt(conn, SO_UTPVERSION, 1); + + // we need to fit one packet in the window + // when we start the connection + conn->max_window = conn->get_packet_size(); + conn->state = CS_IDLE; + + conn->outbuf.mask = 15; + conn->inbuf.mask = 15; + + conn->outbuf.elements = (void**)calloc(16, sizeof(void*)); + conn->inbuf.elements = (void**)calloc(16, sizeof(void*)); + + conn->idx = g_utp_sockets.Append(conn); + + LOG_UTPV("0x%08x: UTP_Create", conn); + + return conn; +} + +void UTP_SetCallbacks(UTPSocket *conn, UTPFunctionTable *funcs, void *userdata) +{ + assert(conn); + + if (funcs == NULL) { + funcs = &zero_funcs; + } + conn->func = *funcs; + conn->userdata = userdata; +} + +bool UTP_SetSockopt(UTPSocket* conn, int opt, int val) +{ + assert(conn); + + switch (opt) { + case SO_SNDBUF: + assert(val >= 1); + conn->opt_sndbuf = val; + return true; + case SO_RCVBUF: + conn->opt_rcvbuf = val; + return true; + case SO_UTPVERSION: + assert(conn->state == CS_IDLE); + if (conn->state != CS_IDLE) { + // too late + return false; + } + if (conn->version == 1 && val == 0) { + conn->reply_micro = INT_MAX; + conn->opt_rcvbuf = 200 * 1024; + conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE; + } else if (conn->version == 0 && val == 1) { + conn->reply_micro = 0; + conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024; + conn->opt_sndbuf = conn->opt_rcvbuf; + } + conn->version = val; + return true; + } + + return false; +} + +// Try to connect to a specified host. +// 'initial' is the number of data bytes to send in the connect packet. +void UTP_Connect(UTPSocket *conn) +{ + assert(conn); + + assert(conn->state == CS_IDLE); + assert(conn->cur_window_packets == 0); + assert(conn->outbuf.get(conn->seq_nr) == NULL); + assert(sizeof(PacketFormatV1) == 20); + + conn->state = CS_SYN_SENT; + + g_current_ms = UTP_GetMilliseconds(); + + // Create and send a connect message + uint32 conn_seed = UTP_Random(); + + // we identify newer versions by setting the + // first two bytes to 0x0001 + if (conn->version > 0) { + conn_seed &= 0xffff; + } + + // used in parse_log.py + LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) " + "target_delay:%u (ms) delay_history:%u " + "delay_base_history:%u (minutes)", + conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000, + CUR_DELAY_SIZE, DELAY_BASE_HISTORY); + + // Setup initial timeout timer. + conn->retransmit_timeout = 3000; + conn->rto_timeout = g_current_ms + conn->retransmit_timeout; + conn->last_rcv_win = conn->get_rcv_window(); + + conn->conn_seed = conn_seed; + conn->conn_id_recv = conn_seed; + conn->conn_id_send = conn_seed+1; + // if you need compatibiltiy with 1.8.1, use this. it increases attackability though. + //conn->seq_nr = 1; + conn->seq_nr = UTP_Random(); + + // Create the connect packet. + const size_t header_ext_size = conn->get_header_extensions_size(); + + OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_ext_size); + + PacketFormatExtensions* p = (PacketFormatExtensions*)pkt->data; + PacketFormatExtensionsV1* p1 = (PacketFormatExtensionsV1*)pkt->data; + + memset(p, 0, header_ext_size); + // SYN packets are special, and have the receive ID in the connid field, + // instead of conn_id_send. + if (conn->version == 0) { + p->pf.connid = conn->conn_id_recv; + p->pf.ext = 2; + p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE); + p->pf.seq_nr = conn->seq_nr; + p->pf.flags = ST_SYN; + p->ext_next = 0; + p->ext_len = 8; + memset(p->extensions, 0, 8); + } else { + p1->pf.version = 1; + p1->pf.type = ST_SYN; + p1->pf.ext = 2; + p1->pf.connid = conn->conn_id_recv; + p1->pf.windowsize = (uint32)conn->last_rcv_win; + p1->pf.seq_nr = conn->seq_nr; + p1->ext_next = 0; + p1->ext_len = 8; + memset(p1->extensions, 0, 8); + } + pkt->transmissions = 0; + pkt->length = header_ext_size; + pkt->payload = 0; + + //LOG_UTPV("0x%08x: Sending connect %s [%u].", + // conn, addrfmt(conn->addr, addrbuf), conn_seed); + + // Remember the message in the outgoing queue. + conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets); + conn->outbuf.put(conn->seq_nr, pkt); + conn->seq_nr++; + conn->cur_window_packets++; + + conn->send_packet(pkt); +} + +bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc, + SendToProc *send_to_proc, void *send_to_userdata, + const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen) +{ + const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen); + + if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) { + LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len); + return false; + } + + const PacketFormat* p = (PacketFormat*)buffer; + const PacketFormatV1* p1 = (PacketFormatV1*)buffer; + + const byte version = UTP_IsV1(p1); + const uint32 id = (version == 0) ? p->connid : uint32(p1->connid); + + if (version == 0 && len < sizeof(PacketFormat)) { + LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version); + return false; + } + + if (version == 1 && len < sizeof(PacketFormatV1)) { + LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version); + return false; + } + + LOG_UTPV("recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id); + + const PacketFormat *pf = (PacketFormat*)p; + const PacketFormatV1 *pf1 = (PacketFormatV1*)p; + + if (version == 0) { + LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf->seq_nr, (uint)pf->ack_nr); + } else { + LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr); + } + + const byte flags = version == 0 ? pf->flags : pf1->type; + + for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) { + UTPSocket *conn = g_utp_sockets[i]; + //LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u", + // addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id); + if (conn->addr != addr) + continue; + + if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) { + LOG_UTPV("0x%08x: recv RST for existing connection", conn); + if (!conn->userdata || conn->state == CS_FIN_SENT) { + conn->state = CS_DESTROY; + } else { + conn->state = CS_RESET; + } + if (conn->userdata) { + conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(), + close_overhead); + const int err = conn->state == CS_SYN_SENT ? + ECONNREFUSED : + ECONNRESET; + conn->func.on_error(conn->userdata, err); + } + return true; + } else if (flags != ST_SYN && conn->conn_id_recv == id) { + LOG_UTPV("0x%08x: recv processing", conn); + const size_t read = UTP_ProcessIncoming(conn, buffer, len); + if (conn->userdata) { + conn->func.on_overhead(conn->userdata, false, + (len - read) + conn->get_udp_overhead(), + header_overhead); + } + return true; + } + } + + if (flags == ST_RESET) { + LOG_UTPV("recv RST for unknown connection"); + return true; + } + + const uint32 seq_nr = version == 0 ? pf->seq_nr : pf1->seq_nr; + if (flags != ST_SYN) { + for (size_t i = 0; i < g_rst_info.GetCount(); i++) { + if (g_rst_info[i].connid != id) + continue; + if (g_rst_info[i].addr != addr) + continue; + if (seq_nr != g_rst_info[i].ack_nr) + continue; + g_rst_info[i].timestamp = UTP_GetMilliseconds(); + LOG_UTPV("recv not sending RST to non-SYN (stored)"); + return true; + } + if (g_rst_info.GetCount() > RST_INFO_LIMIT) { + LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)g_rst_info.GetCount()); + return true; + } + LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)g_rst_info.GetCount()); + RST_Info &r = g_rst_info.Append(); + r.addr = addr; + r.connid = id; + r.ack_nr = seq_nr; + r.timestamp = UTP_GetMilliseconds(); + + UTPSocket::send_rst(send_to_proc, send_to_userdata, addr, id, seq_nr, UTP_Random(), version); + return true; + } + + if (incoming_proc) { + LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version); + + // Create a new UTP socket to handle this new connection + UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen); + // Need to track this value to be able to detect duplicate CONNECTs + conn->conn_seed = id; + // This is value that identifies this connection for them. + conn->conn_id_send = id; + // This is value that identifies this connection for us. + conn->conn_id_recv = id+1; + conn->ack_nr = seq_nr; + conn->seq_nr = UTP_Random(); + conn->fast_resend_seq_nr = conn->seq_nr; + + UTP_SetSockopt(conn, SO_UTPVERSION, version); + conn->state = CS_CONNECTED; + + const size_t read = UTP_ProcessIncoming(conn, buffer, len, true); + + LOG_UTPV("0x%08x: recv send connect ACK", conn); + conn->send_ack(true); + + incoming_proc(send_to_userdata, conn); + + // we report overhead after incoming_proc, because the callbacks are setup now + if (conn->userdata) { + // SYN + conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(), + header_overhead); + // SYNACK + conn->func.on_overhead(conn->userdata, true, conn->get_overhead(), + ack_overhead); + } + } + + return true; +} + +bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen) +{ + const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen); + + // Want the whole packet so we have connection ID + if (len < sizeof(PacketFormat)) { + return false; + } + + const PacketFormat* p = (PacketFormat*)buffer; + const PacketFormatV1* p1 = (PacketFormatV1*)buffer; + + const byte version = UTP_IsV1(p1); + const uint32 id = (version == 0) ? p->connid : uint32(p1->connid); + + for (size_t i = 0; i < g_utp_sockets.GetCount(); ++i) { + UTPSocket *conn = g_utp_sockets[i]; + if (conn->addr == addr && + conn->conn_id_recv == id) { + // Don't pass on errors for idle/closed connections + if (conn->state != CS_IDLE) { + if (!conn->userdata || conn->state == CS_FIN_SENT) { + LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn); + conn->state = CS_DESTROY; + } else { + conn->state = CS_RESET; + } + if (conn->userdata) { + const int err = conn->state == CS_SYN_SENT ? + ECONNREFUSED : + ECONNRESET; + LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err); + conn->func.on_error(conn->userdata, err); + } + } + return true; + } + } + return false; +} + +// Write bytes to the UTP socket. +// Returns true if the socket is still writable. +bool UTP_Write(UTPSocket *conn, size_t bytes) +{ + assert(conn); + +#ifdef g_log_utp_verbose + size_t param = bytes; +#endif + + if (conn->state != CS_CONNECTED) { + LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes); + return false; + } + + g_current_ms = UTP_GetMilliseconds(); + + conn->update_send_quota(); + + // don't send unless it will all fit in the window + size_t packet_size = conn->get_packet_size(); + size_t num_to_send = min(bytes, packet_size); + while (conn->is_writable(num_to_send)) { + // Send an outgoing packet. + // Also add it to the outgoing of packets that have been sent but not ACKed. + + if (num_to_send == 0) { + LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param); + return true; + } + bytes -= num_to_send; + + LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u", + conn, conn->seq_nr, conn->ack_nr, + (uint)(conn->cur_window + num_to_send), + (uint)conn->max_window, (uint)conn->max_window_user, + (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100, + conn->cur_window_packets); + conn->write_outgoing_packet(num_to_send, ST_DATA); + num_to_send = min(bytes, packet_size); + } + + // mark the socket as not being writable. + conn->state = CS_CONNECTED_FULL; + LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes); + return false; +} + +void UTP_RBDrained(UTPSocket *conn) +{ + assert(conn); + + const size_t rcvwin = conn->get_rcv_window(); + + if (rcvwin > conn->last_rcv_win) { + // If last window was 0 send ACK immediately, otherwise should set timer + if (conn->last_rcv_win == 0) { + conn->send_ack(); + } else { + conn->ack_time = g_current_ms + min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD); + } + } +} + +void UTP_CheckTimeouts() +{ + g_current_ms = UTP_GetMilliseconds(); + + for (size_t i = 0; i < g_rst_info.GetCount(); i++) { + if ((int)(g_current_ms - g_rst_info[i].timestamp) >= RST_INFO_TIMEOUT) { + g_rst_info.MoveUpLast(i); + i--; + } + } + if (g_rst_info.GetCount() != g_rst_info.GetAlloc()) { + g_rst_info.Compact(); + } + + for (size_t i = 0; i != g_utp_sockets.GetCount(); i++) { + UTPSocket *conn = g_utp_sockets[i]; + conn->check_timeouts(); + + // Check if the object was deleted + if (conn->state == CS_DESTROY) { + LOG_UTPV("0x%08x: Destroying", conn); + UTP_Free(conn); + i--; + } + } +} + +size_t UTP_GetPacketSize(UTPSocket *socket) +{ + return socket->get_packet_size(); +} + +void UTP_GetPeerName(UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen) +{ + assert(conn); + + socklen_t len; + const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len); + *addrlen = min(len, *addrlen); + memcpy(addr, &sa, *addrlen); +} + +void UTP_GetDelays(UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age) +{ + assert(conn); + + if (ours) *ours = conn->our_hist.get_value(); + if (theirs) *theirs = conn->their_hist.get_value(); + if (age) *age = g_current_ms - conn->last_measured_delay; +} + +#ifdef _DEBUG +void UTP_GetStats(UTPSocket *conn, UTPStats *stats) +{ + assert(conn); + + *stats = conn->_stats; +} +#endif // _DEBUG + +void UTP_GetGlobalStats(UTPGlobalStats *stats) +{ + *stats = _global_stats; +} + +// Close the UTP socket. +// It is not valid for the upper layer to refer to socket after it is closed. +// Data will keep to try being delivered after the close. +void UTP_Close(UTPSocket *conn) +{ + assert(conn); + + assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY); + + LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]); + + switch(conn->state) { + case CS_CONNECTED: + case CS_CONNECTED_FULL: + conn->state = CS_FIN_SENT; + conn->write_outgoing_packet(0, ST_FIN); + break; + + case CS_SYN_SENT: + conn->rto_timeout = UTP_GetMilliseconds() + min(conn->rto * 2, 60); + case CS_GOT_FIN: + conn->state = CS_DESTROY_DELAY; + break; + + default: + conn->state = CS_DESTROY; + break; + } +} diff --git a/third-party/libutp/utp.h b/third-party/libutp/utp.h new file mode 100644 index 000000000..9c1c6f9c8 --- /dev/null +++ b/third-party/libutp/utp.h @@ -0,0 +1,163 @@ +#ifndef __UTP_H__ +#define __UTP_H__ + +#include "utypes.h" + +#ifdef WIN32 +#define _CRT_SECURE_NO_DEPRECATE +#define WIN32_LEAN_AND_MEAN +#include +#include +#include +#pragma comment(lib,"ws2_32.lib") +#else +#include +#include +#include +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +struct UTPSocket; + +// Used to set sockopt on a uTP socket to set the version of uTP +// to use for outgoing connections. This can only be called before +// the uTP socket is connected +#define SO_UTPVERSION 99 + +enum { + // socket has reveived syn-ack (notification only for outgoing connection completion) + // this implies writability + UTP_STATE_CONNECT = 1, + + // socket is able to send more data + UTP_STATE_WRITABLE = 2, + + // connection closed + UTP_STATE_EOF = 3, + + // socket is being destroyed, meaning all data has been sent if possible. + // it is not valid to refer to the socket after this state change occurs + UTP_STATE_DESTROYING = 4, +}; + +// Callbacks called by a uTP socket (register with UTP_SetCallbacks) + +// The uTP socket layer calls this when bytes have been received from the network. +typedef void UTPOnReadProc(void *userdata, const byte *bytes, size_t count); + +// The uTP socket layer calls this to fill the outgoing buffer with bytes. +// The uTP layer takes responsibility that those bytes will be delivered. +typedef void UTPOnWriteProc(void *userdata, byte *bytes, size_t count); + +// The uTP socket layer calls this to retrieve number of bytes currently in read buffer +typedef size_t UTPGetRBSize(void *userdata); + +// The uTP socket layer calls this whenever the socket becomes writable. +typedef void UTPOnStateChangeProc(void *userdata, int state); + +// The uTP socket layer calls this when an error occurs on the socket. +// These errors currently include ECONNREFUSED, ECONNRESET and ETIMEDOUT, but +// could eventually include any BSD socket error. +typedef void UTPOnErrorProc(void *userdata, int errcode); + +// The uTP socket layer calls this to report overhead statistics +typedef void UTPOnOverheadProc(void *userdata, bool send, size_t count, int type); + +struct UTPFunctionTable { + UTPOnReadProc *on_read; + UTPOnWriteProc *on_write; + UTPGetRBSize *get_rb_size; + UTPOnStateChangeProc *on_state; + UTPOnErrorProc *on_error; + UTPOnOverheadProc *on_overhead; +}; + + +// The uTP socket layer calls this when a new incoming uTP connection is established +// this implies writability +typedef void UTPGotIncomingConnection(void *userdata, struct UTPSocket* s); + +// The uTP socket layer calls this to send UDP packets +typedef void SendToProc(void *userdata, const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen); + + +// Functions which can be called with a uTP socket + +// Create a uTP socket +struct UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, + const struct sockaddr *addr, socklen_t addrlen); + +// Setup the callbacks - must be done before connect or on incoming connection +void UTP_SetCallbacks(struct UTPSocket *socket, struct UTPFunctionTable *func, void *userdata); + +// Valid options include SO_SNDBUF, SO_RCVBUF and SO_UTPVERSION +bool UTP_SetSockopt(struct UTPSocket *socket, int opt, int val); + +// Try to connect to a specified host. +void UTP_Connect(struct UTPSocket *socket); + +// Process a UDP packet from the network. This will process a packet for an existing connection, +// or create a new connection and call incoming_proc. Returns true if the packet was processed +// in some way, false if the packet did not appear to be uTP. +bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc, + SendToProc *send_to_proc, void *send_to_userdata, + const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen); + +// Process an ICMP received UDP packet. +bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen); + +// Write bytes to the uTP socket. +// Returns true if the socket is still writable. +bool UTP_Write(struct UTPSocket *socket, size_t count); + +// Notify the uTP socket of buffer drain +void UTP_RBDrained(struct UTPSocket *socket); + +// Call periodically to process timeouts and other periodic events +void UTP_CheckTimeouts(void); + +// Retrieves the peer address of the specified socket, stores this address in the +// sockaddr structure pointed to by the addr argument, and stores the length of this +// address in the object pointed to by the addrlen argument. +void UTP_GetPeerName(struct UTPSocket *socket, struct sockaddr *addr, socklen_t *addrlen); + +void UTP_GetDelays(struct UTPSocket *socket, int32 *ours, int32 *theirs, uint32 *age); + +size_t UTP_GetPacketSize(struct UTPSocket *socket); + +#ifdef _DEBUG +struct UTPStats { + uint64 _nbytes_recv; // total bytes received + uint64 _nbytes_xmit; // total bytes transmitted + uint32 _rexmit; // retransmit counter + uint32 _fastrexmit; // fast retransmit counter + uint32 _nxmit; // transmit counter + uint32 _nrecv; // receive counter (total) + uint32 _nduprecv; // duplicate receive counter +}; + +// Get stats for UTP socket +void UTP_GetStats(struct UTPSocket *socket, UTPStats *stats); +#endif + +// Close the UTP socket. +// It is not valid to issue commands for this socket after it is closed. +// This does not actually destroy the socket until outstanding data is sent, at which +// point the socket will change to the UTP_STATE_DESTROYING state. +void UTP_Close(struct UTPSocket *socket); + +struct UTPGlobalStats { + uint32 _nraw_recv[5]; // total packets recieved less than 300/600/1200/MTU bytes fpr all connections (global) + uint32 _nraw_send[5]; // total packets sent less than 300/600/1200/MTU bytes for all connections (global) +}; + +void UTP_GetGlobalStats(struct UTPGlobalStats *stats); + +#ifdef __cplusplus +} +#endif + +#endif //__UTP_H__ diff --git a/third-party/libutp/utp_config.h b/third-party/libutp/utp_config.h new file mode 100644 index 000000000..bc804ac69 --- /dev/null +++ b/third-party/libutp/utp_config.h @@ -0,0 +1,37 @@ +#define CCONTROL_TARGET (100 * 1000) // us +#define RATE_CHECK_INTERVAL 10000 // ms +#define DYNAMIC_PACKET_SIZE_ENABLED false +#define DYNAMIC_PACKET_SIZE_FACTOR 2 +// This should return the global number of bytes sent, used for determining dynamic +// packet size based on rate +uint64 UTP_GetGlobalUTPBytesSent(const struct sockaddr *remote, socklen_t remotelen) { return 0; } + +enum bandwidth_type_t { + payload_bandwidth, connect_overhead, + close_overhead, ack_overhead, + header_overhead, retransmit_overhead +}; + +#ifdef WIN32 +#define I64u "%I64u" +#else +#define I64u "%Lu" +#endif +#ifdef WIN32 +#define snprintf _snprintf +#endif + +#define g_log_utp 0 +#define g_log_utp_verbose 0 +void utp_log(char const* fmt, ...) +{ + /* + printf("[%u] ", UTP_GetMilliseconds()); + va_list vl; + va_start(vl, fmt); + vprintf(fmt, vl); + va_end(vl); + puts(""); + fflush(stdout); + */ +}; diff --git a/third-party/libutp/utp_config_example.h b/third-party/libutp/utp_config_example.h new file mode 100644 index 000000000..9573f8ba8 --- /dev/null +++ b/third-party/libutp/utp_config_example.h @@ -0,0 +1,26 @@ +#define CCONTROL_TARGET (100 * 1000) // us +#define RATE_CHECK_INTERVAL 10000 // ms +#define DYNAMIC_PACKET_SIZE_ENABLED false +#define DYNAMIC_PACKET_SIZE_FACTOR 2 +// This should return the global number of bytes sent, used for determining dynamic +// packet size based on rate +uint64 UTP_GetGlobalUTPBytesSent(const struct sockaddr *remote, socklen_t remotelen) { return 0; } + +enum bandwidth_type_t { + payload_bandwidth, connect_overhead, + close_overhead, ack_overhead, + header_overhead, retransmit_overhead +}; + +#ifdef WIN32 +#define I64u "%I64u" +#else +#define I64u "%Lu" +#endif +#ifdef WIN32 +#define snprintf _snprintf +#endif + +#define g_log_utp 0 +#define g_log_utp_verbose 0 +void utp_log(char const* fmt, ...); diff --git a/third-party/libutp/utp_utils.cpp b/third-party/libutp/utp_utils.cpp new file mode 100644 index 000000000..235303ad9 --- /dev/null +++ b/third-party/libutp/utp_utils.cpp @@ -0,0 +1,208 @@ +#include "StdAfx.h" + +#include "utypes.h" +#include +#include + +#ifdef WIN32 + +#define WIN32_LEAN_AND_MEAN +#include +#include +#include + +typedef ULONGLONG (WINAPI GetTickCount64Proc)(void); +static GetTickCount64Proc *pt2GetTickCount64; +static GetTickCount64Proc *pt2RealGetTickCount; + +static uint64 startPerformanceCounter; +static uint64 startGetTickCount; +// MSVC 6 standard doesn't like division with uint64s +static double counterPerMicrosecond; + +uint64 UTGetTickCount64() +{ + if (pt2GetTickCount64) { + return pt2GetTickCount64(); + } + if (pt2RealGetTickCount) { + uint64 v = pt2RealGetTickCount(); + // fix return value from GetTickCount + return (DWORD)v | ((v >> 0x18) & 0xFFFFFFFF00000000); + } + return (uint64)GetTickCount(); +} + +uint32 UTP_GetMilliseconds() +{ + return GetTickCount(); +} + +void Time_Initialize() +{ + HMODULE kernel32 = GetModuleHandleA("kernel32.dll"); + pt2GetTickCount64 = (GetTickCount64Proc*)GetProcAddress(kernel32, "GetTickCount64"); + // not a typo. GetTickCount actually returns 64 bits + pt2RealGetTickCount = (GetTickCount64Proc*)GetProcAddress(kernel32, "GetTickCount"); + + uint64 frequency; + QueryPerformanceCounter((LARGE_INTEGER*)&startPerformanceCounter); + QueryPerformanceFrequency((LARGE_INTEGER*)&frequency); + counterPerMicrosecond = (double)frequency / 1000000.0f; + startGetTickCount = UTGetTickCount64(); +} + +int64 abs64(int64 x) { return x < 0 ? -x : x; } + +uint64 UTP_GetMicroseconds() +{ + static bool time_init = false; + if (!time_init) { + time_init = true; + Time_Initialize(); + } + + uint64 counter; + uint64 tick; + + QueryPerformanceCounter((LARGE_INTEGER*) &counter); + tick = UTGetTickCount64(); + + // unfortunately, QueryPerformanceCounter is not guaranteed + // to be monotonic. Make it so. + int64 ret = (int64)(((int64)counter - (int64)startPerformanceCounter) / counterPerMicrosecond); + // if the QPC clock leaps more than one second off GetTickCount64() + // something is seriously fishy. Adjust QPC to stay monotonic + int64 tick_diff = tick - startGetTickCount; + if (abs64(ret / 100000 - tick_diff / 100) > 10) { + startPerformanceCounter -= (uint64)((int64)(tick_diff * 1000 - ret) * counterPerMicrosecond); + ret = (int64)((counter - startPerformanceCounter) / counterPerMicrosecond); + } + return ret; +} + +#else //!WIN32 + +#include +#include // Linux needs both time.h and sys/time.h +#include + +#include +#include +#include + +#if defined(__APPLE__) +#include + +uint64 UTP_GetMicroseconds() +{ + // http://developer.apple.com/mac/library/qa/qa2004/qa1398.html + // http://www.macresearch.org/tutorial_performance_and_time + static mach_timebase_info_data_t sTimebaseInfo; + static uint64_t start_tick = 0; + uint64_t tick; + // Returns a counter in some fraction of a nanoseconds + tick = mach_absolute_time(); + if (sTimebaseInfo.denom == 0) { + // Get the timer ratio to convert mach_absolute_time to nanoseconds + mach_timebase_info(&sTimebaseInfo); + start_tick = tick; + } + // Calculate the elapsed time, convert it to microseconds and return it. + return ((tick - start_tick) * sTimebaseInfo.numer) / (sTimebaseInfo.denom * 1000); +} + +#else + +/* Unfortunately, #ifdef CLOCK_MONOTONIC is not enough to make sure that + POSIX clocks work -- we could be running a recent libc with an ancient + kernel (think OpenWRT). -- jch */ + +uint64 UTP_GetMicroseconds() +{ + static int have_posix_clocks = -1; + int rc; + +#if defined(_POSIX_TIMERS) && _POSIX_TIMERS > 0 && defined(CLOCK_MONOTONIC) + if (have_posix_clocks < 0) { + struct timespec ts; + rc = clock_gettime(CLOCK_MONOTONIC, &ts); + if (rc < 0) { + have_posix_clocks = 0; + } else { + have_posix_clocks = 1; + } + } + + if (have_posix_clocks) { + struct timespec ts; + rc = clock_gettime(CLOCK_MONOTONIC, &ts); + return uint64(ts.tv_sec) * 1000000 + ts.tv_nsec / 1000; + } +#endif + { + static time_t offset = 0, previous = 0; + struct timeval tv; + rc = gettimeofday(&tv, NULL); + tv.tv_sec += offset; + if (previous > tv.tv_sec) { + offset += previous - tv.tv_sec; + tv.tv_sec = previous; + } + previous = tv.tv_sec; + return uint64(tv.tv_sec) * 1000000 + tv.tv_usec; + } +} +#endif + +uint32 UTP_GetMilliseconds() +{ + return UTP_GetMicroseconds() / 1000; +} + +#endif + + +#define ETHERNET_MTU 1500 +#define IPV4_HEADER_SIZE 20 +#define IPV6_HEADER_SIZE 40 +#define UDP_HEADER_SIZE 8 +#define GRE_HEADER_SIZE 24 +#define PPPOE_HEADER_SIZE 8 +#define MPPE_HEADER_SIZE 2 +// packets have been observed in the wild that were fragmented +// with a payload of 1416 for the first fragment +// There are reports of routers that have MTU sizes as small as 1392 +#define FUDGE_HEADER_SIZE 36 +#define TEREDO_MTU 1280 + +#define UDP_IPV4_OVERHEAD (IPV4_HEADER_SIZE + UDP_HEADER_SIZE) +#define UDP_IPV6_OVERHEAD (IPV6_HEADER_SIZE + UDP_HEADER_SIZE) +#define UDP_TEREDO_OVERHEAD (UDP_IPV4_OVERHEAD + UDP_IPV6_OVERHEAD) + +#define UDP_IPV4_MTU (ETHERNET_MTU - IPV4_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE - PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE) +#define UDP_IPV6_MTU (ETHERNET_MTU - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE - PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE) +#define UDP_TEREDO_MTU (TEREDO_MTU - UDP_HEADER_SIZE) + +uint16 UTP_GetUDPMTU(const struct sockaddr *remote, socklen_t remotelen) +{ + // Since we don't know the local address of the interface, + // be conservative and assume all IPv6 connections are Teredo. + return remote->sa_family == AF_INET6 ? UDP_TEREDO_MTU : UDP_IPV4_MTU; +} + +uint16 UTP_GetUDPOverhead(const struct sockaddr *remote, socklen_t remotelen) +{ + // Since we don't know the local address of the interface, + // be conservative and assume all IPv6 connections are Teredo. + return remote->sa_family == AF_INET6 ? UDP_TEREDO_OVERHEAD : UDP_IPV4_OVERHEAD; +} + +uint32 UTP_Random() +{ + return rand(); +} + +void UTP_DelaySample(const struct sockaddr *remote, int sample_ms) {} +size_t UTP_GetPacketSize(const struct sockaddr *remote) { return 1500; } + diff --git a/third-party/libutp/utp_utils.h b/third-party/libutp/utp_utils.h new file mode 100644 index 000000000..585a7e290 --- /dev/null +++ b/third-party/libutp/utp_utils.h @@ -0,0 +1,16 @@ +// This should return the MTU to the destination +uint16 UTP_GetUDPMTU(const struct sockaddr *remote, socklen_t remotelen); +// This should return the number of bytes of UDP overhead for one packet to the +// destination, for overhead calculation only +uint16 UTP_GetUDPOverhead(const struct sockaddr *remote, socklen_t remotelen); +// This should return monotonically increasing milliseconds, start point does not matter +uint32 UTP_GetMilliseconds(); +// This should return monotonically increasing microseconds, start point does not matter +uint64 UTP_GetMicroseconds(); +// This should return a random uint32 +uint32 UTP_Random(); +// This is called every time we have a delay sample is made +void UTP_DelaySample(const struct sockaddr *remote, int sample_ms); +// Should return the max packet size to use when sending to the given address +size_t UTP_GetPacketSize(const struct sockaddr *remote); + diff --git a/third-party/libutp/utypes.h b/third-party/libutp/utypes.h new file mode 100644 index 000000000..673554e24 --- /dev/null +++ b/third-party/libutp/utypes.h @@ -0,0 +1,42 @@ +#ifndef __UTYPES_H__ +#define __UTYPES_H__ + +// standard types +typedef unsigned char byte; +typedef unsigned char uint8; +typedef signed char int8; +typedef unsigned short uint16; +typedef signed short int16; +typedef unsigned int uint; +typedef unsigned int uint32; +typedef signed int int32; + +#ifdef _MSC_VER +typedef unsigned __int64 uint64; +typedef signed __int64 int64; +#else +typedef unsigned long long uint64; +typedef long long int64; +#endif + +/* compile-time assert */ +#ifndef CASSERT +#define CASSERT( exp, name ) typedef int is_not_##name [ (exp ) ? 1 : -1 ]; +#endif + +CASSERT(8 == sizeof(uint64), sizeof_uint64_is_8) +CASSERT(8 == sizeof(int64), sizeof_int64_is_8) + +#ifndef INT64_MAX +#define INT64_MAX 0x7fffffffffffffffLL +#endif + +// always ANSI +typedef const char * cstr; +typedef char * str; + +#ifndef __cplusplus +typedef uint8 bool; +#endif + +#endif //__UTYPES_H__