]> granicus.if.org Git - transmission/commitdiff
import libutp into third-party/ and plug it in to autoconf, automake
authorJordan Lee <jordan@transmissionbt.com>
Fri, 18 Feb 2011 00:31:49 +0000 (00:31 +0000)
committerJordan Lee <jordan@transmissionbt.com>
Fri, 18 Feb 2011 00:31:49 +0000 (00:31 +0000)
12 files changed:
third-party/libutp/LICENSE [new file with mode: 0644]
third-party/libutp/Makefile.am [new file with mode: 0644]
third-party/libutp/README.md [new file with mode: 0644]
third-party/libutp/StdAfx.h [new file with mode: 0644]
third-party/libutp/templates.h [new file with mode: 0644]
third-party/libutp/utp.cpp [new file with mode: 0644]
third-party/libutp/utp.h [new file with mode: 0644]
third-party/libutp/utp_config.h [new file with mode: 0644]
third-party/libutp/utp_config_example.h [new file with mode: 0644]
third-party/libutp/utp_utils.cpp [new file with mode: 0644]
third-party/libutp/utp_utils.h [new file with mode: 0644]
third-party/libutp/utypes.h [new file with mode: 0644]

diff --git a/third-party/libutp/LICENSE b/third-party/libutp/LICENSE
new file mode 100644 (file)
index 0000000..73acb81
--- /dev/null
@@ -0,0 +1,19 @@
+Copyright (c) 2010 BitTorrent, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/third-party/libutp/Makefile.am b/third-party/libutp/Makefile.am
new file mode 100644 (file)
index 0000000..847f596
--- /dev/null
@@ -0,0 +1,6 @@
+AM_CPPFLAGS = -fno-exceptions -fno-rtti -ansi -DPOSIX
+
+noinst_LIBRARIES = libutp.a
+libutp_a_SOURCES = utp.cpp utp_utils.cpp
+noinst_HEADERS = StdAfx.h  templates.h  utp_config_example.h  utp.h  utp_config.h utp_utils.h  utypes.h
+EXTRA_DIST = LICENSE README.md
diff --git a/third-party/libutp/README.md b/third-party/libutp/README.md
new file mode 100644 (file)
index 0000000..d7fca4c
--- /dev/null
@@ -0,0 +1,59 @@
+# libutp - The uTorrent Transport Protocol library.
+Copyright (c) 2010 BitTorrent, Inc.
+
+uTP is a TCP-like implementation of [LEDBAT][ledbat] documented as a BitTorrent
+extension in [BEP-29][bep29]. uTP provides provides reliable, ordered delivery
+while maintaining minimum extra delay. It is implemented on top of UDP to be
+cross-platform and functional today. As a result, uTP is the primary transport
+for uTorrent peer-to-peer connections.
+
+uTP is written in C++, but the external interface is strictly C (ANSI C89).
+
+## The Interface
+
+The uTP socket interface is a bit different from the Berkeley socket API to
+avoid the need for our own select() implementation, and to make it easier to
+write event-based code with minimal buffering.
+
+When you create a uTP socket, you register a set of callbacks. Most notably, the
+on_read callback is a reactive callback which occurs when bytes arrive off the
+network. The write side of the socket is proactive, and you call UTP_Write to
+indicate the number of bytes you wish to write. As packets are created, the
+on_write callback is called for each packet, so you can fill the buffers with
+data.
+
+The libutp interface is not thread-safe. It was designed for use in a
+single-threaded asyncronous context, although with proper synchronization
+it may be used from a multi-threaded environment as well.
+
+See utp.h for more details and other API documentation.
+
+## Examples
+
+See the utp_test and utp_file directories for examples.
+
+## Building
+
+uTP has been known to build on Windows with MSVC and on linux and OS X with gcc.
+On Windows, use the MSVC project files (utp.sln, and friends). On other platforms,
+building the shared library is as simple as:
+
+    make
+
+To build one of the examples, which will statically link in everything it needs
+from libutp:
+
+    cd utp_test && make
+
+## License
+
+libutp is released under the [MIT][lic] license.
+
+## Related Work
+
+Research and analysis of congestion control mechanisms can be found [here.][survey]
+
+[ledbat]: http://datatracker.ietf.org/wg/ledbat/charter/
+[bep29]: http://www.bittorrent.org/beps/bep_0029.html
+[lic]: http://www.opensource.org/licenses/mit-license.php
+[survey]: http://datatracker.ietf.org/doc/draft-ietf-ledbat-survey/
diff --git a/third-party/libutp/StdAfx.h b/third-party/libutp/StdAfx.h
new file mode 100644 (file)
index 0000000..a92dfad
--- /dev/null
@@ -0,0 +1,11 @@
+#if !defined(AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_)
+#define AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_
+
+#if _MSC_VER > 1000
+#pragma once
+#endif // _MSC_VER > 1000
+
+// I don't have anything to put here, but some projects use precompiled headers,
+// so I include StdAfx.h anyway, so they don't have to edit the files to compile normally.
+
+#endif // !defined(AFX_STDAFX_H__C1470942_E9DA_4913_BEF1_9BA7584E595B__INCLUDED_)
diff --git a/third-party/libutp/templates.h b/third-party/libutp/templates.h
new file mode 100644 (file)
index 0000000..9e98fb3
--- /dev/null
@@ -0,0 +1,164 @@
+#ifndef __TEMPLATES_H__
+#define __TEMPLATES_H__
+
+#include "utypes.h"
+#include <assert.h>
+
+#if defined(POSIX)
+/* Allow over-writing FORCEINLINE from makefile because gcc 3.4.4 for buffalo
+   doesn't seem to support __attribute__((always_inline)) in -O0 build
+   (strangely, it works in -Os build) */
+#ifndef FORCEINLINE
+// The always_inline attribute asks gcc to inline the function even if no optimization is being requested.
+// This macro should be used exclusive-or with the inline directive (use one or the other but not both)
+// since Microsoft uses __forceinline to also mean inline,
+// and this code is following a Microsoft compatibility model.
+// Just setting the attribute without also specifying the inline directive apparently won't inline the function,
+// as evidenced by multiply-defined symbols found at link time.
+#define FORCEINLINE inline __attribute__((always_inline))
+#endif
+#endif
+
+// Utility templates
+#undef min
+#undef max
+
+template <typename T> static inline T min(T a, T b) { if (a < b) return a; return b; }
+template <typename T> static inline T max(T a, T b) { if (a > b) return a; return b; }
+
+template <typename T> static inline T min(T a, T b, T c) { return min(min(a,b),c); }
+template <typename T> static inline T max(T a, T b, T c) { return max(max(a,b),c); }
+template <typename T> static inline T clamp(T v, T mi, T ma)
+{
+       if (v > ma) v = ma;
+       if (v < mi) v = mi;
+       return v;
+}
+
+#pragma pack(push,1)
+
+namespace aux
+{
+       FORCEINLINE uint16 host_to_network(uint16 i) { return htons(i); }
+       FORCEINLINE uint32 host_to_network(uint32 i) { return htonl(i); }
+       FORCEINLINE int32 host_to_network(int32 i) { return htonl(i); }
+       FORCEINLINE uint16 network_to_host(uint16 i) { return ntohs(i); }
+       FORCEINLINE uint32 network_to_host(uint32 i) { return ntohl(i); }
+       FORCEINLINE int32 network_to_host(int32 i) { return ntohl(i); }
+}
+
+template <class T>
+struct big_endian
+{
+       T operator=(T i) { m_integer = aux::host_to_network(i); return i; }
+       operator T() const { return aux::network_to_host(m_integer); }
+private:
+       T m_integer;
+};
+
+typedef big_endian<int32> int32_big;
+typedef big_endian<uint32> uint32_big;
+typedef big_endian<uint16> uint16_big;
+
+#pragma pack(pop)
+
+template<typename T> static inline void zeromem(T *a, size_t count = 1) { memset(a, 0, count * sizeof(T)); }
+
+typedef int SortCompareProc(const void *, const void *);
+
+template<typename T> static FORCEINLINE void QuickSortT(T *base, size_t num, int (*comp)(const T *, const T *)) { qsort(base, num, sizeof(T), (SortCompareProc*)comp); }
+
+
+// WARNING: The template parameter MUST be a POD type!
+template <typename T, size_t minsize = 16> class Array {
+protected:
+       T *mem;
+       size_t alloc,count;
+
+public:
+       Array(size_t init) { Init(init); }
+       Array() { Init(); }
+       ~Array() { Free(); }
+
+       void inline Init() { mem = NULL; alloc = count = 0; }
+       void inline Init(size_t init) { Init(); if (init) Resize(init); }
+       size_t inline GetCount() const { return count; }
+       size_t inline GetAlloc() const { return alloc; }
+       void inline SetCount(size_t c) { count = c; }
+
+       inline T& operator[](size_t offset) { assert(offset ==0 || offset<alloc); return mem[offset]; }
+       inline const T& operator[](size_t offset) const { assert(offset ==0 || offset<alloc); return mem[offset]; }
+
+       void inline Resize(size_t a) {
+               if (a == 0) { free(mem); Init(); }
+               else { mem = (T*)realloc(mem, (alloc=a) * sizeof(T)); }
+       }
+
+       void Grow() { Resize(::max<size_t>(minsize, alloc * 2)); }
+
+       inline size_t Append(const T &t) {
+               if (count >= alloc) Grow();
+               size_t r=count++;
+               mem[r] = t;
+               return r;
+       }
+
+       T inline &Append() {
+               if (count >= alloc) Grow();
+               return mem[count++];
+       }
+
+       void inline Compact() {
+               Resize(count);
+       }
+
+       void inline Free() {
+               free(mem);
+               Init();
+       }
+
+       void inline Clear() {
+               count = 0;
+       }
+
+       bool inline MoveUpLast(size_t index) {
+               assert(index < count);
+               size_t c = --count;
+               if (index != c) {
+                       mem[index] = mem[c];
+                       return true;
+               }
+               return false;
+       }
+
+       bool inline MoveUpLastExist(const T &v) {
+               return MoveUpLast(LookupElementExist(v));
+       }
+
+       size_t inline LookupElement(const T &v) const {
+               for(size_t i = 0; i != count; i++)
+                       if (mem[i] == v)
+                               return i;
+               return (size_t) -1;
+       }
+
+       bool inline HasElement(const T &v) const {
+               return LookupElement(v) != -1;
+       }
+
+       typedef int SortCompareProc(const T *a, const T *b);
+
+       void Sort(SortCompareProc* proc, size_t start, size_t end) {
+               QuickSortT(&mem[start], end - start, proc);
+       }
+
+       void Sort(SortCompareProc* proc, size_t start) {
+               Sort(proc, start, count);
+       }
+
+       void Sort(SortCompareProc* proc) {
+               Sort(proc, 0, count);
+       }
+};
+
+#endif //__TEMPLATES_H__
diff --git a/third-party/libutp/utp.cpp b/third-party/libutp/utp.cpp
new file mode 100644 (file)
index 0000000..16c0bf7
--- /dev/null
@@ -0,0 +1,2841 @@
+#include <StdAfx.h>
+
+#include "utp.h"
+#include "templates.h"
+
+#include <stdio.h>
+#include <assert.h>
+#include <string.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <limits.h> // for UINT_MAX
+
+#ifdef WIN32
+#include "win32_inet_ntop.h"
+
+// newer versions of MSVC define these in errno.h
+#ifndef ECONNRESET
+#define ECONNRESET WSAECONNRESET
+#define EMSGSIZE WSAEMSGSIZE
+#define ECONNREFUSED WSAECONNREFUSED
+#define ETIMEDOUT WSAETIMEDOUT
+#endif
+#endif
+
+#ifdef POSIX
+typedef sockaddr_storage SOCKADDR_STORAGE;
+#endif // POSIX
+
+// number of bytes to increase max window size by, per RTT. This is
+// scaled down linearly proportional to off_target. i.e. if all packets
+// in one window have 0 delay, window size will increase by this number.
+// Typically it's less. TCP increases one MSS per RTT, which is 1500
+#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
+#define CUR_DELAY_SIZE 3
+// experiments suggest that a clock skew of 10 ms per 325 seconds
+// is not impossible. Reset delay_base every 13 minutes. The clock
+// skew is dealt with by observing the delay base in the other
+// direction, and adjusting our own upwards if the opposite direction
+// delay base keeps going down
+#define DELAY_BASE_HISTORY 13
+#define MAX_WINDOW_DECAY 100 // ms
+
+#define REORDER_BUFFER_SIZE 32
+#define REORDER_BUFFER_MAX_SIZE 511
+#define OUTGOING_BUFFER_MAX_SIZE 511
+
+#define PACKET_SIZE 350
+
+// this is the minimum max_window value. It can never drop below this
+#define MIN_WINDOW_SIZE 10
+
+// when window sizes are smaller than one packet_size, this
+// will pace the packets to average at the given window size
+// if it's not set, it will simply not send anything until
+// there's a timeout
+#define USE_PACKET_PACING 1
+
+// if we receive 4 or more duplicate acks, we resend the packet
+// that hasn't been acked yet
+#define DUPLICATE_ACKS_BEFORE_RESEND 3
+
+#define DELAYED_ACK_BYTE_THRESHOLD 2400 // bytes
+#define DELAYED_ACK_TIME_THRESHOLD 100 // milliseconds
+
+#define RST_INFO_TIMEOUT 10000
+#define RST_INFO_LIMIT 1000
+// 29 seconds determined from measuring many home NAT devices
+#define KEEPALIVE_INTERVAL 29000
+
+
+#define SEQ_NR_MASK 0xFFFF
+#define ACK_NR_MASK 0xFFFF
+
+#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
+
+#include "utp_utils.h"
+#include "utp_config.h"
+
+#define LOG_UTP if (g_log_utp) utp_log
+#define LOG_UTPV if (g_log_utp_verbose) utp_log
+
+uint32 g_current_ms;
+
+// The totals are derived from the following data:
+//  45: IPv6 address including embedded IPv4 address
+//  11: Scope Id
+//   2: Brackets around IPv6 address when port is present
+//   6: Port (including colon)
+//   1: Terminating null byte
+char addrbuf[65];
+char addrbuf2[65];
+#define addrfmt(x, s) x.fmt(s, sizeof(s))
+
+#pragma pack(push,1)
+
+struct PackedSockAddr {
+
+       // The values are always stored here in network byte order
+       union {
+               byte _in6[16];          // IPv6
+               uint16 _in6w[8];        // IPv6, word based (for convenience)
+               uint32 _in6d[4];        // Dword access
+               in6_addr _in6addr;      // For convenience
+       } _in;
+
+       // Host byte order
+       uint16 _port;
+
+#define _sin4 _in._in6d[3]     // IPv4 is stored where it goes if mapped
+
+#define _sin6 _in._in6
+#define _sin6w _in._in6w
+#define _sin6d _in._in6d
+
+       byte get_family() const
+       {
+               return (IN6_IS_ADDR_V4MAPPED((in6_addr*)_sin6) != 0) ? AF_INET : AF_INET6;
+       }
+
+       bool operator==(const PackedSockAddr& rhs) const
+       {
+               if (&rhs == this)
+                       return true;
+               if (_port != rhs._port)
+                       return false;
+               return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0;
+       }
+       bool operator!=(const PackedSockAddr& rhs) const { return !(*this == rhs); }
+
+       PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len)
+       {
+               if (sa->ss_family == AF_INET) {
+                       assert(len >= sizeof(sockaddr_in));
+                       const sockaddr_in *sin = (sockaddr_in*)sa;
+                       _sin6w[0] = 0;
+                       _sin6w[1] = 0;
+                       _sin6w[2] = 0;
+                       _sin6w[3] = 0;
+                       _sin6w[4] = 0;
+                       _sin6w[5] = 0xffff;
+                       _sin4 = sin->sin_addr.s_addr;
+                       _port = ntohs(sin->sin_port);
+               } else {
+                       assert(len >= sizeof(sockaddr_in6));
+                       const sockaddr_in6 *sin6 = (sockaddr_in6*)sa;
+                       _in._in6addr = sin6->sin6_addr;
+                       _port = ntohs(sin6->sin6_port);
+               }
+       }
+
+       SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len = NULL) const
+       {
+               SOCKADDR_STORAGE sa;
+               const byte family = get_family();
+               if (family == AF_INET) {
+                       sockaddr_in *sin = (sockaddr_in*)&sa;
+                       if (len) *len = sizeof(sockaddr_in);
+                       memset(sin, 0, sizeof(sockaddr_in));
+                       sin->sin_family = family;
+                       sin->sin_port = htons(_port);
+                       sin->sin_addr.s_addr = _sin4;
+               } else {
+                       sockaddr_in6 *sin6 = (sockaddr_in6*)&sa;
+                       memset(sin6, 0, sizeof(sockaddr_in6));
+                       if (len) *len = sizeof(sockaddr_in6);
+                       sin6->sin6_family = family;
+                       sin6->sin6_addr = _in._in6addr;
+                       sin6->sin6_port = htons(_port);
+               }
+               return sa;
+       }
+
+       cstr fmt(str s, size_t len) const
+       {
+               memset(s, 0, len);
+               const byte family = get_family();
+               str i;
+               if (family == AF_INET) {
+                       inet_ntop(family, (uint32*)&_sin4, s, len);
+                       i = s;
+                       while (*++i) {}
+               } else {
+                       i = s;
+                       *i++ = '[';
+                       inet_ntop(family, (in6_addr*)&_in._in6addr, i, len-1);
+                       while (*++i) {}
+                       *i++ = ']';
+               }
+               snprintf(i, len - (i-s), ":%u", _port);
+               return s;
+       }
+};
+
+struct RST_Info {
+       PackedSockAddr addr;
+       uint32 connid;
+       uint32 timestamp;
+       uint16 ack_nr;
+};
+
+// these packet sizes are including the uTP header wich
+// is either 20 or 23 bytes depending on version
+#define PACKET_SIZE_EMPTY_BUCKET 0
+#define PACKET_SIZE_EMPTY 23
+#define PACKET_SIZE_SMALL_BUCKET 1
+#define PACKET_SIZE_SMALL 373
+#define PACKET_SIZE_MID_BUCKET 2
+#define PACKET_SIZE_MID 723
+#define PACKET_SIZE_BIG_BUCKET 3
+#define PACKET_SIZE_BIG 1400
+#define PACKET_SIZE_HUGE_BUCKET 4
+
+struct PacketFormat {
+       // connection ID
+       uint32_big connid;
+       uint32_big tv_sec;
+       uint32_big tv_usec;
+       uint32_big reply_micro;
+       // receive window size in PACKET_SIZE chunks
+       byte windowsize;
+       // Type of the first extension header
+       byte ext;
+       // Flags
+       byte flags;
+       // Sequence number
+       uint16_big seq_nr;
+       // Acknowledgment number
+       uint16_big ack_nr;
+};
+
+struct PacketFormatAck {
+       PacketFormat pf;
+       byte ext_next;
+       byte ext_len;
+       byte acks[4];
+};
+
+struct PacketFormatExtensions {
+       PacketFormat pf;
+       byte ext_next;
+       byte ext_len;
+       byte extensions[8];
+};
+
+struct PacketFormatV1 {
+       // protocol version
+       byte version:4;
+       // type (formerly flags)
+       byte type:4;
+       // Type of the first extension header
+       byte ext;
+       // connection ID
+       uint16_big connid;
+       uint32_big tv_usec;
+       uint32_big reply_micro;
+       // receive window size in bytes
+       uint32_big windowsize;
+       // Sequence number
+       uint16_big seq_nr;
+       // Acknowledgment number
+       uint16_big ack_nr;
+};
+
+struct PacketFormatAckV1 {
+       PacketFormatV1 pf;
+       byte ext_next;
+       byte ext_len;
+       byte acks[4];
+};
+
+struct PacketFormatExtensionsV1 {
+       PacketFormatV1 pf;
+       byte ext_next;
+       byte ext_len;
+       byte extensions[8];
+};
+
+#pragma pack(pop)
+
+enum {
+       ST_DATA = 0,            // Data packet.
+       ST_FIN = 1,                     // Finalize the connection. This is the last packet.
+       ST_STATE = 2,           // State packet. Used to transmit an ACK with no data.
+       ST_RESET = 3,           // Terminate connection forcefully.
+       ST_SYN = 4,                     // Connect SYN
+       ST_NUM_STATES,          // used for bounds checking
+};
+
+static const cstr flagnames[] = {
+       "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
+};
+
+enum CONN_STATE {
+       CS_IDLE = 0,
+       CS_SYN_SENT = 1,
+       CS_CONNECTED = 2,
+       CS_CONNECTED_FULL = 3,
+       CS_GOT_FIN = 4,
+       CS_DESTROY_DELAY = 5,
+       CS_FIN_SENT = 6,
+       CS_RESET = 7,
+       CS_DESTROY = 8,
+};
+
+static const cstr statenames[] = {
+       "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
+};
+
+struct OutgoingPacket {
+       size_t length;
+       size_t payload;
+       uint64 time_sent; // microseconds
+       uint transmissions:31;
+       bool need_resend:1;
+       byte data[1];
+};
+
+void no_read(void *socket, const byte *bytes, size_t count) {}
+void no_write(void *socket, byte *bytes, size_t count) {}
+size_t no_rb_size(void *socket) { return 0; }
+void no_state(void *socket, int state) {}
+void no_error(void *socket, int errcode) {}
+void no_overhead(void *socket, bool send, size_t count, int type) {}
+
+UTPFunctionTable zero_funcs = {
+       &no_read,
+       &no_write,
+       &no_rb_size,
+       &no_state,
+       &no_error,
+       &no_overhead,
+};
+
+struct SizableCircularBuffer {
+       // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
+       size_t mask;
+       // This is the elements that the circular buffer points to
+       void **elements;
+
+       void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; }
+       void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
+
+       void grow(size_t item, size_t index);
+       void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
+       size_t size() { return mask + 1; }
+};
+
+static struct UTPGlobalStats _global_stats;
+
+// Item contains the element we want to make space for
+// index is the index in the list.
+void SizableCircularBuffer::grow(size_t item, size_t index)
+{
+       // Figure out the new size.
+       size_t size = mask + 1;
+       do size *= 2; while (index >= size);
+
+       // Allocate the new buffer
+       void **buf = (void**)calloc(size, sizeof(void*));
+
+       size--;
+
+       // Copy elements from the old buffer to the new buffer
+       for (size_t i = 0; i <= mask; i++) {
+               buf[(item - index + i) & size] = get(item - index + i);
+       }
+
+       // Swap to the newly allocated buffer
+       mask = size;
+       free(elements);
+       elements = buf;
+}
+
+// compare if lhs is less than rhs, taking wrapping
+// into account. if lhs is close to UINT_MAX and rhs
+// is close to 0, lhs is assumed to have wrapped and
+// considered smaller
+bool wrapping_compare_less(uint32 lhs, uint32 rhs)
+{
+       // distance walking from lhs to rhs, downwards
+       const uint32 dist_down = lhs - rhs;
+       // distance walking from lhs to rhs, upwards
+       const uint32 dist_up = rhs - lhs;
+
+       // if the distance walking up is shorter, lhs
+       // is less than rhs. If the distance walking down
+       // is shorter, then rhs is less than lhs
+       return dist_up < dist_down;
+}
+
+struct DelayHist {
+       uint32 delay_base;
+
+       // this is the history of delay samples,
+       // normalized by using the delay_base. These
+       // values are always greater than 0 and measures
+       // the queuing delay in microseconds
+       uint32 cur_delay_hist[CUR_DELAY_SIZE];
+       size_t cur_delay_idx;
+
+       // this is the history of delay_base. It's
+       // a number that doesn't have an absolute meaning
+       // only relative. It doesn't make sense to initialize
+       // it to anything other than values relative to
+       // what's been seen in the real world.
+       uint32 delay_base_hist[DELAY_BASE_HISTORY];
+       size_t delay_base_idx;
+       // the time when we last stepped the delay_base_idx
+       uint32 delay_base_time;
+
+       bool delay_base_initialized;
+
+       void clear()
+       {
+               delay_base_initialized = false;
+               delay_base = 0;
+               cur_delay_idx = 0;
+               delay_base_idx = 0;
+               delay_base_time = g_current_ms;
+               for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
+                       cur_delay_hist[i] = 0;
+               }
+               for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
+                       delay_base_hist[i] = 0;
+               }
+       }
+
+       void shift(const uint32 offset)
+       {
+               // the offset should never be "negative"
+               assert(offset < 0x10000000);
+
+               // increase all of our base delays by this amount
+               // this is used to take clock skew into account
+               // by observing the other side's changes in its base_delay
+               for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
+                       delay_base_hist[i] += offset;
+               }
+               delay_base += offset;
+       }
+
+       void add_sample(const uint32 sample)
+       {
+               // The two clocks (in the two peers) are assumed not to
+               // progress at the exact same rate. They are assumed to be
+               // drifting, which causes the delay samples to contain
+               // a systematic error, either they are under-
+               // estimated or over-estimated. This is why we update the
+               // delay_base every two minutes, to adjust for this.
+
+               // This means the values will keep drifting and eventually wrap.
+               // We can cross the wrapping boundry in two directions, either
+               // going up, crossing the highest value, or going down, crossing 0.
+
+               // if the delay_base is close to the max value and sample actually
+               // wrapped on the other end we would see something like this:
+               // delay_base = 0xffffff00, sample = 0x00000400
+               // sample - delay_base = 0x500 which is the correct difference
+
+               // if the delay_base is instead close to 0, and we got an even lower
+               // sample (that will eventually update the delay_base), we may see
+               // something like this:
+               // delay_base = 0x00000400, sample = 0xffffff00
+               // sample - delay_base = 0xfffffb00
+               // this needs to be interpreted as a negative number and the actual
+               // recorded delay should be 0.
+
+               // It is important that all arithmetic that assume wrapping
+               // is done with unsigned intergers. Signed integers are not guaranteed
+               // to wrap the way unsigned integers do. At least GCC takes advantage
+               // of this relaxed rule and won't necessarily wrap signed ints.
+
+               // remove the clock offset and propagation delay.
+               // delay base is min of the sample and the current
+               // delay base. This min-operation is subject to wrapping
+               // and care needs to be taken to correctly choose the
+               // true minimum.
+
+               // specifically the problem case is when delay_base is very small
+               // and sample is very large (because it wrapped past zero), sample
+               // needs to be considered the smaller
+
+               if (!delay_base_initialized) {
+                       // delay_base being 0 suggests that we haven't initialized
+                       // it or its history with any real measurements yet. Initialize
+                       // everything with this sample.
+                       for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
+                               // if we don't have a value, set it to the current sample
+                               delay_base_hist[i] = sample;
+                               continue;
+                       }
+                       delay_base = sample;
+                       delay_base_initialized = true;
+               }
+
+               if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx])) {
+                       // sample is smaller than the current delay_base_hist entry
+                       // update it
+                       delay_base_hist[delay_base_idx] = sample;
+               }
+
+               // is sample lower than delay_base? If so, update delay_base
+               if (wrapping_compare_less(sample, delay_base)) {
+                       // sample is smaller than the current delay_base
+                       // update it
+                       delay_base = sample;
+               }
+               
+               // this operation may wrap, and is supposed to
+               const uint32 delay = sample - delay_base;
+               // sanity check. If this is triggered, something fishy is going on
+               // it means the measured sample was greater than 32 seconds!
+//             assert(delay < 0x2000000);
+
+               cur_delay_hist[cur_delay_idx] = delay;
+               cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
+
+               // once every minute
+               if (g_current_ms - delay_base_time > 60 * 1000) {
+                       delay_base_time = g_current_ms;
+                       delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
+                       // clear up the new delay base history spot by initializing
+                       // it to the current sample, then update it 
+                       delay_base_hist[delay_base_idx] = sample;
+                       delay_base = delay_base_hist[0];
+                       // Assign the lowest delay in the last 2 minutes to delay_base
+                       for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
+                               if (wrapping_compare_less(delay_base_hist[i], delay_base))
+                                       delay_base = delay_base_hist[i];
+                       }
+               }
+       }
+
+       uint32 get_value()
+       {
+               uint32 value = UINT_MAX;
+               for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
+                       value = min<uint32>(cur_delay_hist[i], value);
+               }
+               // value could be UINT_MAX if we have no samples yet...
+               return value;
+       }
+};
+
+struct UTPSocket {
+       PackedSockAddr addr;
+
+       size_t idx;
+
+       uint16 reorder_count;
+       byte duplicate_ack;
+
+       // the number of bytes we've received but not acked yet
+       size_t bytes_since_ack;
+
+       // the number of packets in the send queue. Packets that haven't
+       // yet been sent count as well as packets marked as needing resend
+       // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
+       uint16 cur_window_packets;
+
+       // how much of the window is used, number of bytes in-flight
+       // packets that have not yet been sent do not count, packets
+       // that are marked as needing to be re-sent (due to a timeout)
+       // don't count either
+       size_t cur_window;
+       // maximum window size, in bytes
+       size_t max_window;
+       // SO_SNDBUF setting, in bytes
+       size_t opt_sndbuf;
+       // SO_RCVBUF setting, in bytes
+       size_t opt_rcvbuf;
+
+       // Is a FIN packet in the reassembly buffer?
+       bool got_fin:1;
+       // Timeout procedure
+       bool fast_timeout:1;
+
+       // max receive window for other end, in bytes
+       size_t max_window_user;
+       // 0 = original uTP header, 1 = second revision
+       byte version;
+       CONN_STATE state;
+       // TickCount when we last decayed window (wraps)
+       int32 last_rwin_decay;
+
+       // the sequence number of the FIN packet. This field is only set
+       // when we have received a FIN, and the flag field has the FIN flag set.
+       // it is used to know when it is safe to destroy the socket, we must have
+       // received all packets up to this sequence number first.
+       uint16 eof_pkt;
+
+       // All sequence numbers up to including this have been properly received
+       // by us
+       uint16 ack_nr;
+       // This is the sequence number for the next packet to be sent.
+       uint16 seq_nr;
+
+       uint16 timeout_seq_nr;
+
+       // This is the sequence number of the next packet we're allowed to
+       // do a fast resend with. This makes sure we only do a fast-resend
+       // once per packet. We can resend the packet with this sequence number
+       // or any later packet (with a higher sequence number).
+       uint16 fast_resend_seq_nr;
+
+       uint32 reply_micro;
+
+       // the time when we need to send another ack. If there's
+       // nothing to ack, this is a very large number
+       uint32 ack_time;
+
+       uint32 last_got_packet;
+       uint32 last_sent_packet;
+       uint32 last_measured_delay;
+       uint32 last_maxed_out_window;
+
+       // the last time we added send quota to the connection
+       // when adding send quota, this is subtracted from the
+       // current time multiplied by max_window / rtt
+       // which is the current allowed send rate.
+       int32 last_send_quota;
+
+       // the number of bytes we are allowed to send on
+       // this connection. If this is more than one packet
+       // size when we run out of data to send, it is clamped
+       // to the packet size
+       // this value is multiplied by 100 in order to get
+       // higher accuracy when dealing with low rates
+       int32 send_quota;
+
+       SendToProc *send_to_proc;
+       void *send_to_userdata;
+       UTPFunctionTable func;
+       void *userdata;
+
+       // Round trip time
+       uint rtt;
+       // Round trip time variance
+       uint rtt_var;
+       // Round trip timeout
+       uint rto;
+       DelayHist rtt_hist;
+       uint retransmit_timeout;
+       // The RTO timer will timeout here.
+       uint rto_timeout;
+       // When the window size is set to zero, start this timer. It will send a new packet every 30secs.
+       uint32 zerowindow_time;
+
+       uint32 conn_seed;
+       // Connection ID for packets I receive
+       uint32 conn_id_recv;
+       // Connection ID for packets I send
+       uint32 conn_id_send;
+       // Last rcv window we advertised, in bytes
+       size_t last_rcv_win;
+
+       DelayHist our_hist;
+       DelayHist their_hist;
+
+       // extension bytes from SYN packet
+       byte extensions[8];
+
+       SizableCircularBuffer inbuf, outbuf;
+
+#ifdef _DEBUG
+       // Public stats, returned by UTP_GetStats().  See utp.h
+       UTPStats _stats;
+#endif // _DEBUG
+
+       // Calculates the current receive window
+       size_t get_rcv_window() const
+       {
+               // If we don't have a connection (such as during connection
+               // establishment, always act as if we have an empty buffer).
+               if (!userdata) return opt_rcvbuf;
+
+               // Trim window down according to what's already in buffer.
+               const size_t numbuf = func.get_rb_size(userdata);
+               assert((int)numbuf >= 0);
+               return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
+       }
+
+       // Test if we're ready to decay max_window
+       // XXX this breaks when spaced by > INT_MAX/2, which is 49
+       // days; the failure mode in that case is we do an extra decay
+       // or fail to do one when we really shouldn't.
+       bool can_decay_win(int32 msec) const
+       {
+               return msec - last_rwin_decay >= MAX_WINDOW_DECAY;
+       }
+
+       // If we can, decay max window, returns true if we actually did so
+       void maybe_decay_win()
+       {
+               if (can_decay_win(g_current_ms)) {
+                       // TCP uses 0.5
+                       max_window = (size_t)(max_window * .5);
+                       last_rwin_decay = g_current_ms;
+                       if (max_window < MIN_WINDOW_SIZE)
+                               max_window = MIN_WINDOW_SIZE;
+               }
+       }
+
+       size_t get_header_size() const
+       {
+               return (version ? sizeof(PacketFormatV1) : sizeof(PacketFormat));
+       }
+
+       size_t get_header_extensions_size() const
+       {
+               return (version ? sizeof(PacketFormatExtensionsV1) : sizeof(PacketFormatExtensions));
+       }
+
+       void sent_ack()
+       {
+               ack_time = g_current_ms + 0x70000000;
+               bytes_since_ack = 0;
+       }
+
+       size_t get_udp_mtu() const
+       {
+               socklen_t len;
+               SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
+               return UTP_GetUDPMTU((const struct sockaddr *)&sa, len);
+       }
+
+       size_t get_udp_overhead() const
+       {
+               socklen_t len;
+               SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
+               return UTP_GetUDPOverhead((const struct sockaddr *)&sa, len);
+       }
+
+       uint64 get_global_utp_bytes_sent() const
+       {
+               socklen_t len;
+               SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
+               return UTP_GetGlobalUTPBytesSent((const struct sockaddr *)&sa, len);
+       }
+
+       size_t get_overhead() const
+       {
+               return get_udp_overhead() + get_header_size();
+       }
+
+       void send_data(PacketFormat* b, size_t length, bandwidth_type_t type);
+
+       void send_ack(bool synack = false);
+
+       void send_keep_alive();
+
+       static void send_rst(SendToProc *send_to_proc, void *send_to_userdata,
+                                                const PackedSockAddr &addr, uint32 conn_id_send,
+                                                uint16 ack_nr, uint16 seq_nr, byte version);
+
+       void send_packet(OutgoingPacket *pkt);
+
+       bool is_writable(size_t to_write);
+
+       bool flush_packets();
+
+       void write_outgoing_packet(size_t payload, uint flags);
+
+       void update_send_quota();
+
+#ifdef _DEBUG
+       void check_invariant();
+#endif
+
+       void check_timeouts();
+
+       int ack_packet(uint16 seq);
+
+       size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
+
+       void selective_ack(uint base, const byte *mask, byte len);
+
+       void apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
+
+       size_t get_packet_size();
+};
+
+Array<RST_Info> g_rst_info;
+Array<UTPSocket*> g_utp_sockets;
+
+static void UTP_RegisterSentPacket(size_t length) {
+       if (length <= PACKET_SIZE_MID) {
+               if (length <= PACKET_SIZE_EMPTY) {
+                       _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
+               } else if (length <= PACKET_SIZE_SMALL) {
+                       _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
+               } else
+                       _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
+       } else {
+               if (length <= PACKET_SIZE_BIG) {
+                       _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
+               } else
+                       _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
+       }
+}
+
+void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, const byte *p, size_t len, const PackedSockAddr &addr)
+{
+       socklen_t tolen;
+       SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
+       UTP_RegisterSentPacket(len);
+       send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen);
+}
+
+void UTPSocket::send_data(PacketFormat* b, size_t length, bandwidth_type_t type)
+{
+       // time stamp this packet with local time, the stamp goes into
+       // the header of every packet at the 8th byte for 8 bytes :
+       // two integers, check packet.h for more
+       uint64 time = UTP_GetMicroseconds();
+
+       PacketFormatV1* b1 = (PacketFormatV1*)b;
+       if (version == 0) {
+               b->tv_sec = (uint32)(time / 1000000);
+               b->tv_usec = time % 1000000;
+               b->reply_micro = reply_micro;
+       } else {
+               b1->tv_usec = (uint32)time;
+               b1->reply_micro = reply_micro;
+       }
+
+       last_sent_packet = g_current_ms;
+
+#ifdef _DEBUG
+       _stats._nbytes_xmit += length;
+       ++_stats._nxmit;
+#endif
+       if (userdata) {
+               size_t n;
+               if (type == payload_bandwidth) {
+                       // if this packet carries payload, just
+                       // count the header as overhead
+                       type = header_overhead;
+                       n = get_overhead();
+               } else {
+                       n = length + get_udp_overhead();
+               }
+               func.on_overhead(userdata, true, n, type);
+       }
+#if g_log_utp_verbose
+       int flags = version == 0 ? b->flags : b1->type;
+       uint16 seq_nr = version == 0 ? b->seq_nr : b1->seq_nr;
+       uint16 ack_nr = version == 0 ? b->ack_nr : b1->ack_nr;
+       LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
+                        this, addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags],
+                        seq_nr, ack_nr);
+#endif
+       send_to_addr(send_to_proc, send_to_userdata, (const byte*)b, length, addr);
+}
+
+void UTPSocket::send_ack(bool synack)
+{
+       PacketFormatExtensions pfe;
+       zeromem(&pfe);
+       PacketFormatExtensionsV1& pfe1 = (PacketFormatExtensionsV1&)pfe;
+       PacketFormatAck& pfa = (PacketFormatAck&)pfe1;
+       PacketFormatAckV1& pfa1 = (PacketFormatAckV1&)pfe1;
+
+       size_t len;
+       last_rcv_win = get_rcv_window();
+       if (version == 0) {
+               pfa.pf.connid = conn_id_send;
+               pfa.pf.ack_nr = (uint16)ack_nr;
+               pfa.pf.seq_nr = (uint16)seq_nr;
+               pfa.pf.flags = ST_STATE;
+               pfa.pf.ext = 0;
+               pfa.pf.windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
+               len = sizeof(PacketFormat);
+       } else {
+               pfa1.pf.version = 1;
+               pfa1.pf.type = ST_STATE;
+               pfa1.pf.ext = 0;
+               pfa1.pf.connid = conn_id_send;
+               pfa1.pf.ack_nr = ack_nr;
+               pfa1.pf.seq_nr = seq_nr;
+               pfa1.pf.windowsize = (uint32)last_rcv_win;
+               len = sizeof(PacketFormatV1);
+       }
+
+       // we never need to send EACK for connections
+       // that are shutting down
+       if (reorder_count != 0 && state < CS_GOT_FIN) {
+               // if reorder count > 0, send an EACK.
+               // reorder count should always be 0
+               // for synacks, so this should not be
+               // as synack
+               assert(!synack);
+               if (version == 0) {
+                       pfa.pf.ext = 1;
+                       pfa.ext_next = 0;
+                       pfa.ext_len = 4;
+               } else {
+                       pfa1.pf.ext = 1;
+                       pfa1.ext_next = 0;
+                       pfa1.ext_len = 4;
+               }
+               uint m = 0;
+
+               // reorder count should only be non-zero
+               // if the packet ack_nr + 1 has not yet
+               // been received
+               assert(inbuf.get(ack_nr + 1) == NULL);
+               size_t window = min<size_t>(14+16, inbuf.size());
+               // Generate bit mask of segments received.
+               for (size_t i = 0; i < window; i++) {
+                       if (inbuf.get(ack_nr + i + 2) != NULL) {
+                               m |= 1 << i;
+                               LOG_UTPV("0x%08x: EACK packet [%u]", this, ack_nr + i + 2);
+                       }
+               }
+               if (version == 0) {
+                       pfa.acks[0] = (byte)m;
+                       pfa.acks[1] = (byte)(m >> 8);
+                       pfa.acks[2] = (byte)(m >> 16);
+                       pfa.acks[3] = (byte)(m >> 24);
+               } else {
+                       pfa1.acks[0] = (byte)m;
+                       pfa1.acks[1] = (byte)(m >> 8);
+                       pfa1.acks[2] = (byte)(m >> 16);
+                       pfa1.acks[3] = (byte)(m >> 24);
+               }
+               len += 4 + 2;
+               LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", this, ack_nr, conn_id_send, m);
+       } else if (synack) {
+               // we only send "extensions" in response to SYN
+               // and the reorder count is 0 in that state
+
+               LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", this, ack_nr, conn_id_send);
+               if (version == 0) {
+                       pfe.pf.ext = 2;
+                       pfe.ext_next = 0;
+                       pfe.ext_len = 8;
+                       memset(pfe.extensions, 0, 8);
+               } else {
+                       pfe1.pf.ext = 2;
+                       pfe1.ext_next = 0;
+                       pfe1.ext_len = 8;
+                       memset(pfe1.extensions, 0, 8);
+               }
+               len += 8 + 2;
+       } else {
+               LOG_UTPV("0x%08x: Sending ACK %u [%u]", this, ack_nr, conn_id_send);
+       }
+
+       sent_ack();
+       send_data((PacketFormat*)&pfe, len, ack_overhead);
+}
+
+void UTPSocket::send_keep_alive()
+{
+       ack_nr--;
+       LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", this, ack_nr, conn_id_send);
+       send_ack();
+       ack_nr++;
+}
+
+void UTPSocket::send_rst(SendToProc *send_to_proc, void *send_to_userdata,
+                                                const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version)
+{
+       PacketFormat pf;
+       zeromem(&pf);
+       PacketFormatV1& pf1 = (PacketFormatV1&)pf;
+
+       size_t len;
+       if (version == 0) {
+               pf.connid = conn_id_send;
+               pf.ack_nr = ack_nr;
+               pf.seq_nr = seq_nr;
+               pf.flags = ST_RESET;
+               pf.ext = 0;
+               pf.windowsize = 0;
+               len = sizeof(PacketFormat);
+       } else {
+               pf1.version = 1;
+               pf1.type= ST_RESET;
+               pf1.ext = 0;
+               pf1.connid = conn_id_send;
+               pf1.ack_nr = ack_nr;
+               pf1.seq_nr = seq_nr;
+               pf1.windowsize = 0;
+               len = sizeof(PacketFormatV1);
+       }
+
+       LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
+       LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
+       send_to_addr(send_to_proc, send_to_userdata, (const byte*)&pf1, len, addr);
+}
+
+void UTPSocket::send_packet(OutgoingPacket *pkt)
+{
+       // only count against the quota the first time we
+       // send the packet. Don't enforce quota when closing
+       // a socket. Only enforce the quota when we're sending
+       // at slow rates (max window < packet size)
+       size_t max_send = min(max_window, opt_sndbuf, max_window_user);
+
+       if (pkt->transmissions == 0 || pkt->need_resend) {
+               cur_window += pkt->payload;
+       }
+
+       size_t packet_size = get_packet_size();
+       if (pkt->transmissions == 0 && max_send < packet_size) {
+               assert(state == CS_FIN_SENT ||
+                          (int32)pkt->payload <= send_quota / 100);
+               send_quota = send_quota - (int32)(pkt->payload * 100);
+       }
+
+       pkt->need_resend = false;
+
+       PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
+       PacketFormat* p = (PacketFormat*)pkt->data;
+       if (version == 0) {
+               p->ack_nr = ack_nr;
+       } else {
+               p1->ack_nr = ack_nr;
+       }
+       pkt->time_sent = UTP_GetMicroseconds();
+       pkt->transmissions++;
+       sent_ack();
+       send_data((PacketFormat*)pkt->data, pkt->length,
+               (state == CS_SYN_SENT) ? connect_overhead
+               : (pkt->transmissions == 1) ? payload_bandwidth
+               : retransmit_overhead);
+}
+
+bool UTPSocket::is_writable(size_t to_write)
+{
+       // return true if it's OK to stuff another packet into the
+       // outgoing queue. Since we may be using packet pacing, we
+       // might not actually send the packet right away to affect the
+       // cur_window. The only thing that happens when we add another
+       // packet is that cur_window_packets is increased.
+       size_t max_send = min(max_window, opt_sndbuf, max_window_user);
+
+       size_t packet_size = get_packet_size();
+
+       if (cur_window + packet_size >= max_window)
+               last_maxed_out_window = g_current_ms;
+
+       // if we don't have enough quota, we can't write regardless
+       if (USE_PACKET_PACING) {
+               if (send_quota / 100 < (int32)to_write) return false;
+       }
+
+       // subtract one to save space for the FIN packet
+       if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) return false;
+
+       // if sending another packet would not make the window exceed
+       // the max_window, we can write
+       if (cur_window + packet_size <= max_send) return true;
+
+       // if the window size is less than a packet, and we have enough
+       // quota to send a packet, we can write, even though it would
+       // make the window exceed the max size
+       // the last condition is needed to not put too many packets
+       // in the send buffer. cur_window isn't updated until we flush
+       // the send buffer, so we need to take the number of packets
+       // into account
+       if (USE_PACKET_PACING) {
+               if (max_window < to_write &&
+                       cur_window < max_window &&
+                       cur_window_packets == 0) {
+                       return true;
+               }
+       }
+
+       return false;
+}
+
+bool UTPSocket::flush_packets()
+{
+       size_t packet_size = get_packet_size();
+
+       // send packets that are waiting on the pacer to be sent
+       // i has to be an unsigned 16 bit counter to wrap correctly
+       // signed types are not guaranteed to wrap the way you expect
+       for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
+               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
+               if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
+               // have we run out of quota?
+               if (!is_writable(pkt->payload)) {
+                       return true;
+               }
+
+               // Nagle check
+               // don't send the last packet if we have one packet in-flight
+               // and the current packet is still smaller than packet_size.
+               if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
+                       cur_window_packets == 1 ||
+                       pkt->payload >= packet_size) {
+                       send_packet(pkt);
+
+                       // No need to send another ack if there is nothing to reorder.
+                       if (reorder_count == 0) {
+                               sent_ack();
+                       }
+               }
+       }
+       return false;
+}
+
+void UTPSocket::write_outgoing_packet(size_t payload, uint flags)
+{
+       // Setup initial timeout timer
+       if (cur_window_packets == 0) {
+               retransmit_timeout = rto;
+               rto_timeout = g_current_ms + retransmit_timeout;
+               assert(cur_window == 0);
+       }
+
+       size_t packet_size = get_packet_size();
+       do {
+               assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
+               assert(flags == ST_DATA || flags == ST_FIN);
+
+               size_t added = 0;
+
+               OutgoingPacket *pkt = NULL;
+               
+               if (cur_window_packets > 0) {
+                       pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
+               }
+
+               const size_t header_size = get_header_size();
+               bool append = true;
+
+               // if there's any room left in the last packet in the window
+               // and it hasn't been sent yet, fill that frame first
+               if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
+                       // Use the previous unsent packet
+                       added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
+                       pkt = (OutgoingPacket*)realloc(pkt,
+                                                                                  (sizeof(OutgoingPacket) - 1) +
+                                                                                  header_size +
+                                                                                  pkt->payload + added);
+                       outbuf.put(seq_nr - 1, pkt);
+                       append = false;
+                       assert(!pkt->need_resend);
+               } else {
+                       // Create the packet to send.
+                       added = payload;
+                       pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
+                                                                                 header_size +
+                                                                                 added);
+                       pkt->payload = 0;
+                       pkt->transmissions = 0;
+                       pkt->need_resend = false;
+               }
+
+               if (added) {
+                       // Fill it with data from the upper layer.
+                       func.on_write(userdata, pkt->data + header_size + pkt->payload, added);
+               }
+               pkt->payload += added;
+               pkt->length = header_size + pkt->payload;
+
+               last_rcv_win = get_rcv_window();
+
+               PacketFormat* p = (PacketFormat*)pkt->data;
+               PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
+               if (version == 0) {
+                       p->connid = conn_id_send;
+                       p->ext = 0;
+                       p->windowsize = (byte)DIV_ROUND_UP(last_rcv_win, PACKET_SIZE);
+                       p->ack_nr = ack_nr;
+                       p->flags = flags;
+               } else {
+                       p1->version = 1;
+                       p1->type = flags;
+                       p1->ext = 0;
+                       p1->connid = conn_id_send;
+                       p1->windowsize = (uint32)last_rcv_win;
+                       p1->ack_nr = ack_nr;
+               }
+
+               if (append) {
+                       // Remember the message in the outgoing queue.
+                       outbuf.ensure_size(seq_nr, cur_window_packets);
+                       outbuf.put(seq_nr, pkt);
+                       if (version == 0) p->seq_nr = seq_nr;
+                       else p1->seq_nr = seq_nr;
+                       seq_nr++;
+                       cur_window_packets++;
+               }
+
+               payload -= added;
+
+       } while (payload);
+
+       flush_packets();
+}
+
+void UTPSocket::update_send_quota()
+{
+       int dt = g_current_ms - last_send_quota;
+       if (dt == 0) return;
+       last_send_quota = g_current_ms;
+       size_t add = max_window * dt * 100 / (rtt_hist.delay_base?rtt_hist.delay_base:50);
+       if (add > max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100) add = max_window;
+       send_quota += (int32)add;
+//     LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d",
+//                      this, dt, rtt, (uint)max_window, send_quota / 100);
+}
+
+#ifdef _DEBUG
+void UTPSocket::check_invariant()
+{
+       if (reorder_count > 0) {
+               assert(inbuf.get(ack_nr + 1) == NULL);
+       }
+
+       size_t outstanding_bytes = 0;
+       for (int i = 0; i < cur_window_packets; ++i) {
+               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
+               if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
+               outstanding_bytes += pkt->payload;
+       }
+       assert(outstanding_bytes == cur_window);
+}
+#endif
+
+void UTPSocket::check_timeouts()
+{
+#ifdef _DEBUG
+       check_invariant();
+#endif
+
+       // this invariant should always be true
+       assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
+
+       LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d "
+                        "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d",
+                        this, (int)(rto_timeout - g_current_ms), (uint)max_window, (uint)cur_window,
+                        send_quota / 100, statenames[state], cur_window_packets,
+                        (uint)bytes_since_ack, (int)(g_current_ms - ack_time));
+
+       update_send_quota();
+       flush_packets();
+
+
+       if (USE_PACKET_PACING) {
+               // In case the new send quota made it possible to send another packet
+               // Mark the socket as writable. If we don't use pacing, the send
+               // quota does not affect if the socket is writeable
+               // if we don't use packet pacing, the writable event is triggered
+               // whenever the cur_window falls below the max_window, so we don't
+               // need this check then
+               if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
+                       state = CS_CONNECTED;
+                       LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
+                                        this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
+                       func.on_state(userdata, UTP_STATE_WRITABLE);
+               }
+       }
+
+       switch (state) {
+       case CS_SYN_SENT:
+       case CS_CONNECTED_FULL:
+       case CS_CONNECTED:
+       case CS_FIN_SENT: {
+
+               // Reset max window...
+               if ((int)(g_current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
+                       max_window_user = PACKET_SIZE;
+               }
+
+               if ((int)(g_current_ms - rto_timeout) >= 0 &&
+                       (!(USE_PACKET_PACING) || cur_window_packets > 0) &&
+                       rto_timeout > 0) {
+
+                       /*
+                       OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
+                       
+                       // If there were a lot of retransmissions, force recomputation of round trip time
+                       if (pkt->transmissions >= 4)
+                               rtt = 0;
+                       */
+
+                       // Increase RTO
+                       const uint new_timeout = retransmit_timeout * 2;
+                       if (new_timeout >= 30000 || (state == CS_SYN_SENT && new_timeout > 6000)) {
+                               // more than 30 seconds with no reply. kill it.
+                               // if we haven't even connected yet, give up sooner. 6 seconds
+                               // means 2 tries at the following timeouts: 3, 6 seconds
+                               if (state == CS_FIN_SENT)
+                                       state = CS_DESTROY;
+                               else
+                                       state = CS_RESET;
+                               func.on_error(userdata, ETIMEDOUT);
+                               goto getout;
+                       }
+
+                       retransmit_timeout = new_timeout;
+                       rto_timeout = g_current_ms + new_timeout;
+
+                       // On Timeout
+                       duplicate_ack = 0;
+
+                       // rate = min_rate
+                       max_window = get_packet_size();
+                       send_quota = max<int32>((int32)max_window * 100, send_quota);
+
+                       // every packet should be considered lost
+                       for (int i = 0; i < cur_window_packets; ++i) {
+                               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
+                               if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
+                               pkt->need_resend = true;
+                               assert(cur_window >= pkt->payload);
+                               cur_window -= pkt->payload;
+                       }
+
+                       // used in parse_log.py
+                       LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u",
+                                       this, seq_nr - cur_window_packets, retransmit_timeout, (uint)max_window);
+
+                       fast_timeout = true;
+                       timeout_seq_nr = seq_nr;
+
+                       if (cur_window_packets > 0) {
+                               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
+                               assert(pkt);
+                               send_quota = max<int32>((int32)pkt->length * 100, send_quota);
+
+                               // Re-send the packet.
+                               send_packet(pkt);
+                       }
+               }
+
+               // Mark the socket as writable
+               if (state == CS_CONNECTED_FULL && is_writable(get_packet_size())) {
+                       state = CS_CONNECTED;
+                       LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
+                                        this, (uint)max_window, (uint)cur_window, send_quota / 100, (uint)get_packet_size());
+                       func.on_state(userdata, UTP_STATE_WRITABLE);
+               }
+
+               if (state >= CS_CONNECTED && state <= CS_FIN_SENT) {
+                       // Send acknowledgment packets periodically, or when the threshold is reached
+                       if (bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
+                               (int)(g_current_ms - ack_time) >= 0) {
+                               send_ack();
+                       }
+
+                       if ((int)(g_current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
+                               send_keep_alive();
+                       }
+               }
+
+               break;
+       }
+
+       // Close?
+       case CS_GOT_FIN:
+       case CS_DESTROY_DELAY:
+               if ((int)(g_current_ms - rto_timeout) >= 0) {
+                       state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
+                       if (cur_window_packets > 0 && userdata) {
+                               func.on_error(userdata, ECONNRESET);
+                       }
+               }
+               break;
+       // prevent warning
+       case CS_IDLE:
+       case CS_RESET:
+       case CS_DESTROY:
+               break;
+       }
+
+       getout:
+
+       // make sure we don't accumulate quota when we don't have
+       // anything to send
+       int32 limit = max<int32>((int32)max_window / 2, 5 * (int32)get_packet_size()) * 100;
+       if (send_quota > limit) send_quota = limit;
+}
+
+// returns:
+// 0: the packet was acked.
+// 1: it means that the packet had already been acked
+// 2: the packet has not been sent yet
+int UTPSocket::ack_packet(uint16 seq)
+{
+       OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
+
+       // the packet has already been acked (or not sent)
+       if (pkt == NULL) {
+               LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", this, seq);
+               return 1;
+       }
+
+       // can't ack packets that haven't been sent yet!
+       if (pkt->transmissions == 0) {
+               LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
+                                this, seq, (uint)pkt->payload, pkt->need_resend);
+               return 2;
+       }
+
+       LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)",
+                        this, seq, (uint)pkt->payload, pkt->need_resend);
+
+       outbuf.put(seq, NULL);
+
+       // if we never re-sent the packet, update the RTT estimate
+       if (pkt->transmissions == 1) {
+               // Estimate the round trip time.
+               const uint32 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000);
+               if (rtt == 0) {
+                       // First round trip time sample
+                       rtt = ertt;
+                       rtt_var = ertt / 2;
+                       // sanity check. rtt should never be more than 6 seconds
+//                     assert(rtt < 6000);
+               } else {
+                       // Compute new round trip times
+                       const int delta = (int)rtt - ertt;
+                       rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
+                       rtt = rtt - rtt/8 + ertt/8;
+                       // sanity check. rtt should never be more than 6 seconds
+//                     assert(rtt < 6000);
+                       rtt_hist.add_sample(ertt);
+               }
+               rto = max<uint>(rtt + rtt_var * 4, 500);
+               LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u",
+                                this, ertt, rtt, rtt_var, rto);
+       }
+       retransmit_timeout = rto;
+       rto_timeout = g_current_ms + rto;
+       // if need_resend is set, this packet has already
+       // been considered timed-out, and is not included in
+       // the cur_window anymore
+       if (!pkt->need_resend) {
+               assert(cur_window >= pkt->payload);
+               cur_window -= pkt->payload;
+       }
+       free(pkt);
+       return 0;
+}
+
+// count the number of bytes that were acked by the EACK header
+size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
+{
+       if (cur_window_packets == 0) return 0;
+
+       size_t acked_bytes = 0;
+       int bits = len * 8;
+
+       do {
+               uint v = base + bits;
+
+               // ignore bits that haven't been sent yet
+               // see comment in UTPSocket::selective_ack
+               if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
+                       continue;
+
+               // ignore bits that represents packets we haven't sent yet
+               // or packets that have already been acked
+               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
+               if (!pkt || pkt->transmissions == 0)
+                       continue;
+
+               // Count the number of segments that were successfully received past it.
+               if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
+                       assert((int)(pkt->payload) >= 0);
+                       acked_bytes += pkt->payload;
+                       min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
+                       continue;
+               }
+       } while (--bits >= -1);
+       return acked_bytes;
+}
+
+void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
+{
+       if (cur_window_packets == 0) return;
+
+       // the range is inclusive [0, 31] bits
+       int bits = len * 8 - 1;
+
+       int count = 0;
+
+       // resends is a stack of sequence numbers we need to resend. Since we
+       // iterate in reverse over the acked packets, at the end, the top packets
+       // are the ones we want to resend
+       int resends[32];
+       int nr = 0;
+
+       LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", this, *(uint32*)mask, base);
+       do {
+               // we're iterating over the bits from higher sequence numbers
+               // to lower (kind of in reverse order, wich might not be very
+               // intuitive)
+               uint v = base + bits;
+
+               // ignore bits that haven't been sent yet
+               // and bits that fall below the ACKed sequence number
+               // this can happen if an EACK message gets
+               // reordered and arrives after a packet that ACKs up past
+               // the base for thie EACK message
+
+               // this is essentially the same as:
+               // if v >= seq_nr || v <= seq_nr - cur_window_packets
+               // but it takes wrapping into account
+
+               // if v == seq_nr the -1 will make it wrap. if v > seq_nr
+               // it will also wrap (since it will fall further below 0)
+               // and be > cur_window_packets.
+               // if v == seq_nr - cur_window_packets, the result will be
+               // seq_nr - (seq_nr - cur_window_packets) - 1
+               // == seq_nr - seq_nr + cur_window_packets - 1
+               // == cur_window_packets - 1 which will be caught by the
+               // test. If v < seq_nr - cur_window_packets the result will grow
+               // fall furhter outside of the cur_window_packets range.
+
+               // sequence number space:
+               //
+               //     rejected <   accepted   > rejected 
+               // <============+--------------+============>
+               //              ^              ^
+               //              |              |
+               //        (seq_nr-wnd)         seq_nr
+
+               if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
+                       continue;
+
+               // this counts as a duplicate ack, even though we might have
+               // received an ack for this packet previously (in another EACK
+               // message for instance)
+               bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
+
+               // if this packet is acked, it counts towards the duplicate ack counter
+               if (bit_set) count++;
+
+               // ignore bits that represents packets we haven't sent yet
+               // or packets that have already been acked
+               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
+               if (!pkt || pkt->transmissions == 0) {
+                       LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s",
+                                        this, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
+                       continue;
+               }
+
+               // Count the number of segments that were successfully received past it.
+               if (bit_set) {
+                       // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
+                       assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
+                       ack_packet(v);
+                       continue;
+               }
+
+               // Resend segments
+               // if count is less than our re-send limit, we haven't seen enough
+               // acked packets in front of this one to warrant a re-send.
+               // if count == 0, we're still going through the tail of zeroes
+               if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
+                       count >= DUPLICATE_ACKS_BEFORE_RESEND &&
+                       duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
+                       resends[nr++] = v;
+                       LOG_UTPV("0x%08x: no ack for %u", this, v);
+               } else {
+                       LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
+                                        this, v, count, duplicate_ack, fast_resend_seq_nr);
+               }
+       } while (--bits >= -1);
+
+       if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) < 256 &&
+               count >= DUPLICATE_ACKS_BEFORE_RESEND &&
+               duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
+               // if we get enough duplicate acks to start
+               // resending, the first packet we should resend
+               // is base-1
+               resends[nr++] = base - 1;
+       } else {
+               LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
+                                this, base - 1, count, duplicate_ack, fast_resend_seq_nr);
+       }
+
+       bool back_off = false;
+       int i = 0;
+       while (nr > 0) {
+               uint v = resends[--nr];
+               // don't consider the tail of 0:es to be lost packets
+               // only unacked packets with acked packets after should
+               // be considered lost
+               OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
+
+               // this may be an old (re-ordered) packet, and some of the
+               // packets in here may have been acked already. In which
+               // case they will not be in the send queue anymore
+               if (!pkt) continue;
+
+               // used in parse_log.py
+               LOG_UTP("0x%08x: Packet %u lost. Resending", this, v);
+
+               // On Loss
+               back_off = true;
+#ifdef _DEBUG
+               ++_stats._rexmit;
+#endif
+               send_packet(pkt);
+               fast_resend_seq_nr = v + 1;
+
+               // Re-send max 4 packets.
+               if (++i >= 4) break;
+       }
+
+       if (back_off)
+               maybe_decay_win();
+
+       duplicate_ack = count;
+}
+
+void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
+{
+       // the delay can never be greater than the rtt. The min_rtt
+       // variable is the RTT in microseconds
+       
+       assert(min_rtt >= 0);
+       int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
+       assert(our_delay != INT_MAX);
+       assert(our_delay >= 0);
+       assert(our_hist.get_value() >= 0);
+
+       SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
+       UTP_DelaySample((sockaddr*)&sa, our_delay / 1000);
+
+       // This test the connection under heavy load from foreground
+       // traffic. Pretend that our delays are very high to force the
+       // connection to use sub-packet size window sizes
+       //our_delay *= 4;
+
+       // target is microseconds
+       int target = CCONTROL_TARGET;
+       if (target <= 0) target = 100000;
+
+       double off_target = target - our_delay;
+
+       // this is the same as:
+       //
+       //    (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
+       //
+       // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
+       // of the target delay the current delay represents.
+       // The min() around off_target protects against crazy values of our_delay, which may happen when th
+       // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
+       // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
+       // as for large negative numbers, this direction is already capped at the min packet size further down
+       // the min around the bytes_acked protects against the case where the window size was recently
+       // shrunk and the number of acked bytes exceeds that. This is considered no more than one full
+       // window, in order to keep the gain within sane boundries.
+
+       assert(bytes_acked > 0);
+       double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
+       double delay_factor = off_target / target;
+       double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
+
+       // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
+       // may increase per RTT, we may not increase the window size more than that proportional
+       // to the number of bytes that were acked, so that once one window has been acked (one rtt)
+       // the increase limit is not exceeded
+       // the +1. is to allow for floating point imprecision
+       assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
+
+       if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) {
+               // if it was more than 300 milliseconds since we tried to send a packet
+               // and stopped because we hit the max window, we're most likely rate
+               // limited (which prevents us from ever hitting the window size)
+               // if this is the case, we cannot let the max_window grow indefinitely
+               scaled_gain = 0;
+       }
+
+       if (scaled_gain + max_window < MIN_WINDOW_SIZE) {
+               max_window = MIN_WINDOW_SIZE;
+       } else {
+               max_window = (size_t)(max_window + scaled_gain);
+       }
+
+       // make sure that the congestion window is below max
+       // make sure that we don't shrink our window too small
+       max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
+
+       // used in parse_log.py
+       LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
+                       "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
+                       "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" "
+                       "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u",
+                       this, actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
+                       (int)off_target / 1000, (uint)(max_window),  our_hist.delay_base,
+                       (our_delay + their_hist.get_value()) / 1000, target / 1000, (uint)bytes_acked,
+                       (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
+                       (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
+                       send_quota / 100, (uint)max_window_user, rto, (int)(rto_timeout - g_current_ms),
+                       UTP_GetMicroseconds(), cur_window_packets, (uint)get_packet_size(),
+                       their_hist.delay_base, their_hist.delay_base + their_hist.get_value());
+}
+
+static void UTP_RegisterRecvPacket(UTPSocket *conn, size_t len)
+{
+#ifdef _DEBUG
+       ++conn->_stats._nrecv;
+       conn->_stats._nbytes_recv += len;
+#endif
+
+       if (len <= PACKET_SIZE_MID) {
+               if (len <= PACKET_SIZE_EMPTY) {
+                       _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
+               } else if (len <= PACKET_SIZE_SMALL) {
+                       _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
+               } else 
+                       _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
+       } else {
+               if (len <= PACKET_SIZE_BIG) {
+                       _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
+               } else 
+                       _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
+       }
+}
+
+// returns the max number of bytes of payload the uTP
+// connection is allowed to send
+size_t UTPSocket::get_packet_size()
+{
+       int header_size = version == 1
+               ? sizeof(PacketFormatV1)
+               : sizeof(PacketFormat);
+
+       size_t mtu = get_udp_mtu();
+
+       if (DYNAMIC_PACKET_SIZE_ENABLED) {
+               SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
+               size_t max_packet_size = UTP_GetPacketSize((sockaddr*)&sa);
+               return min(mtu - header_size, max_packet_size);
+       }
+       else
+       {
+               return mtu - header_size;
+       }
+}
+
+// Process an incoming packet
+// syn is true if this is the first packet received. It will cut off parsing
+// as soon as the header is done
+size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
+{
+       UTP_RegisterRecvPacket(conn, len);
+
+       g_current_ms = UTP_GetMilliseconds();
+
+       conn->update_send_quota();
+
+       const PacketFormat *pf = (PacketFormat*)packet;
+       const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
+       const byte *packet_end = packet + len;
+
+       uint16 pk_seq_nr;
+       uint16 pk_ack_nr;
+       uint8 pk_flags;
+       if (conn->version == 0) {
+               pk_seq_nr = pf->seq_nr;
+               pk_ack_nr = pf->ack_nr;
+               pk_flags = pf->flags;
+       } else {
+               pk_seq_nr = pf1->seq_nr;
+               pk_ack_nr = pf1->ack_nr;
+               pk_flags = pf1->type;
+       }
+
+       if (pk_flags >= ST_NUM_STATES) return 0;
+
+       LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u",
+                        conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version,
+                        conn->version == 0?(uint64)(pf->tv_sec) * 1000000 + pf->tv_usec:uint64(pf1->tv_usec),
+                        conn->version == 0?(uint32)(pf->reply_micro):(uint32)(pf1->reply_micro));
+
+       // mark receipt time
+       uint64 time = UTP_GetMicroseconds();
+
+       // RSTs are handled earlier, since the connid matches the send id not the recv id
+       assert(pk_flags != ST_RESET);
+
+       // TODO: maybe send a ST_RESET if we're in CS_RESET?
+
+       const byte *selack_ptr = NULL;
+
+       // Unpack UTP packet options
+       // Data pointer
+       const byte *data = (const byte*)pf + conn->get_header_size();
+       if (conn->get_header_size() > len) {
+               LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn);
+               return 0;
+       }
+       // Skip the extension headers
+       uint extension = conn->version == 0 ? pf->ext : pf1->ext;
+       if (extension != 0) {
+               do {
+                       // Verify that the packet is valid.
+                       data += 2;
+
+                       if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
+                               LOG_UTPV("0x%08x: Invalid len of extensions", conn);
+                               return 0;
+                       }
+
+                       switch(extension) {
+                       case 1: // Selective Acknowledgment
+                               selack_ptr = data;
+                               break;
+                       case 2: // extension bits
+                               if (data[-1] != 8) {
+                                       LOG_UTPV("0x%08x: Invalid len of extension bits header", conn);
+                                       return 0;
+                               }
+                               memcpy(conn->extensions, data, 8);
+                               LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn,
+                                       conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
+                                       conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
+                       }
+                       extension = data[-2];
+                       data += data[-1];
+               } while (extension);
+       }
+
+       if (conn->state == CS_SYN_SENT) {
+               // if this is a syn-ack, initialize our ack_nr
+               // to match the sequence number we got from
+               // the other end
+               conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
+       }
+
+       g_current_ms = UTP_GetMilliseconds();
+       conn->last_got_packet = g_current_ms;
+
+       if (syn) {
+               return 0;
+       }
+
+       // seqnr is the number of packets past the expected
+       // packet this is. ack_nr is the last acked, seq_nr is the
+       // current. Subtracring 1 makes 0 mean "this is the next
+       // expected packet".
+       const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
+
+       // Getting an invalid sequence number?
+       if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
+               if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
+                       conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
+               }
+               LOG_UTPV("    Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr);
+               return 0;
+       }
+
+       // Process acknowledgment
+       // acks is the number of packets that was acked
+       int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
+
+       // this happens when we receive an old ack nr
+       if (acks > conn->cur_window_packets) acks = 0;
+
+       // if we get the same ack_nr as in the last packet
+       // increase the duplicate_ack counter, otherwise reset
+       // it to 0
+       if (conn->cur_window_packets > 0) {
+               if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) &&
+                       conn->cur_window_packets > 0) {
+                       //++conn->duplicate_ack;
+               } else {
+                       conn->duplicate_ack = 0;
+               }
+
+               // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
+               // and fast_resend_seq_nr <= ack_nr + 1
+               //    resend ack_nr + 1
+       }
+
+       // figure out how many bytes were acked
+       size_t acked_bytes = 0;
+
+       // the minimum rtt of all acks
+       // this is the upper limit on the delay we get back
+       // from the other peer. Our delay cannot exceed
+       // the rtt of the packet. If it does, clamp it.
+       // this is done in apply_ledbat_ccontrol()
+       int64 min_rtt = INT64_MAX;
+
+       for (int i = 0; i < acks; ++i) {
+               int seq = conn->seq_nr - conn->cur_window_packets + i;
+               OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
+               if (pkt == 0 || pkt->transmissions == 0) continue;
+               assert((int)(pkt->payload) >= 0);
+               acked_bytes += pkt->payload;
+               min_rtt = min<int64>(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
+       }
+       
+       // count bytes acked by EACK
+       if (selack_ptr != NULL) {
+               acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
+                                                                                                selack_ptr, selack_ptr[-1], min_rtt);
+       }
+
+       LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
+                        conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
+                        seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
+
+       uint64 p;
+
+       if (conn->version == 0) {
+               p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec;
+       } else {
+               p = pf1->tv_usec;
+       }
+
+       conn->last_measured_delay = g_current_ms;
+
+       // get delay in both directions
+       // record the delay to report back
+       const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
+       conn->reply_micro = their_delay;
+       uint32 prev_delay_base = conn->their_hist.delay_base;
+       if (their_delay != 0) conn->their_hist.add_sample(their_delay);
+
+       // if their new delay base is less than their previous one
+       // we should shift our delay base in the other direction in order
+       // to take the clock skew into account
+       if (prev_delay_base != 0 &&
+               wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) {
+               // never adjust more than 10 milliseconds
+               if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
+                       conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
+               }
+       }
+
+       const uint32 actual_delay = conn->version==0
+               ?(pf->reply_micro==INT_MAX?0:uint32(pf->reply_micro))
+               :(uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
+
+       assert(conn->our_hist.get_value() >= 0);
+       // if the actual delay is 0, it means the other end
+       // hasn't received a sample from us yet, and doesn't
+       // know what it is. We can't update out history unless
+       // we have a true measured sample
+       prev_delay_base = conn->our_hist.delay_base;
+       if (actual_delay != 0) conn->our_hist.add_sample(actual_delay);
+       assert(conn->our_hist.get_value() >= 0);
+
+       // if our new delay base is less than our previous one
+       // we should shift the other end's delay base in the other
+       // direction in order to take the clock skew into account
+       // This is commented out because it creates bad interactions
+       // with our adjustment in the other direction. We don't really
+       // need our estimates of the other peer to be very accurate
+       // anyway. The problem with shifting here is that we're more
+       // likely shift it back later because of a low latency. This
+       // second shift back would cause us to shift our delay base
+       // which then get's into a death spiral of shifting delay bases
+/*     if (prev_delay_base != 0 &&
+               wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
+               // never adjust more than 10 milliseconds
+               if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
+                       conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
+               }
+       }
+*/
+       // only apply the congestion controller on acks
+       // if we don't have a delay measurement, there's
+       // no point in invoking the congestion control
+       if (actual_delay != 0 && acked_bytes >= 1)
+               conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt);
+
+       // sanity check, the other end should never ack packets
+       // past the point we've sent
+       if (acks <= conn->cur_window_packets) {
+               conn->max_window_user = conn->version == 0
+                       ? pf->windowsize * PACKET_SIZE : pf1->windowsize;
+
+               // If max user window is set to 0, then we startup a timer
+               // That will reset it to 1 after 15 seconds.
+               if (conn->max_window_user == 0)
+                       // Reset max_window_user to 1 every 15 seconds.
+                       conn->zerowindow_time = g_current_ms + 15000;
+
+               // Respond to connect message
+               // Switch to CONNECTED state.
+               if (conn->state == CS_SYN_SENT) {
+                       conn->state = CS_CONNECTED;
+                       conn->func.on_state(conn->userdata, UTP_STATE_CONNECT);
+
+               // We've sent a fin, and everything was ACKed (including the FIN),
+               // it's safe to destroy the socket. cur_window_packets == acks
+               // means that this packet acked all the remaining packets that
+               // were in-flight.
+               } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
+                       conn->state = CS_DESTROY;
+               }
+
+               // Update fast resend counter
+               if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK))
+                       conn->fast_resend_seq_nr = pk_ack_nr + 1;
+
+               LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr);
+
+               for (int i = 0; i < acks; ++i) {
+                       int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
+                       // if ack_status is 0, the packet was acked.
+                       // if acl_stauts is 1, it means that the packet had already been acked
+                       // if it's 2, the packet has not been sent yet
+                       // We need to break this loop in the latter case. This could potentially
+                       // happen if we get an ack_nr that does not exceed what we have stuffed
+                       // into the outgoing buffer, but does exceed what we have sent
+                       if (ack_status == 2) {
+#ifdef _DEBUG
+                               OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
+                               assert(pkt->transmissions == 0);
+#endif
+                               break;
+                       }
+                       conn->cur_window_packets--;
+               }
+#ifdef _DEBUG
+               if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
+#endif
+
+               // packets in front of this may have been acked by a
+               // selective ack (EACK). Keep decreasing the window packet size
+               // until we hit a packet that is still waiting to be acked
+               // in the send queue
+               // this is especially likely to happen when the other end
+               // has the EACK send bug older versions of uTP had
+               while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets))
+                       conn->cur_window_packets--;
+
+#ifdef _DEBUG
+               if (conn->cur_window_packets == 0) assert(conn->cur_window == 0);
+#endif
+
+               // this invariant should always be true
+               assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
+
+               // flush Nagle
+               if (conn->cur_window_packets == 1) {
+                       OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
+                       // do we still have quota?
+                       if (pkt->transmissions == 0 &&
+                               (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)pkt->length)) {
+                               conn->send_packet(pkt);
+
+                               // No need to send another ack if there is nothing to reorder.
+                               if (conn->reorder_count == 0) {
+                                       conn->sent_ack();
+                               }
+                       }
+               }
+
+               // Fast timeout-retry
+               if (conn->fast_timeout) {
+                       LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
+                       if (((conn->fast_resend_seq_nr - conn->timeout_seq_nr) & ACK_NR_MASK) >= 0 ||
+                               ((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
+                               conn->fast_timeout = false;
+                       } else {
+                               OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
+                               if (pkt && pkt->transmissions > 0) {
+                                       LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets);
+#ifdef _DEBUG
+                                       ++conn->_stats._fastrexmit;
+#endif
+                                       conn->fast_resend_seq_nr++;
+                                       conn->send_packet(pkt);
+                               }
+                       }
+               }
+       }
+
+       // Process selective acknowledgent
+       if (selack_ptr != NULL) {
+               conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
+       }
+
+       // this invariant should always be true
+       assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
+
+       LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d",
+                        conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
+                        conn->send_quota / 100);
+
+       // In case the ack dropped the current window below
+       // the max_window size, Mark the socket as writable
+       if (conn->state == CS_CONNECTED_FULL && conn->is_writable(conn->get_packet_size())) {
+               conn->state = CS_CONNECTED;
+               LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u",
+                                conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)conn->get_packet_size());
+               conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE);
+       }
+
+       if (pk_flags == ST_STATE) {
+               // This is a state packet only.
+               return 0;
+       }
+
+       // The connection is not in a state that can accept data?
+       if (conn->state != CS_CONNECTED &&
+               conn->state != CS_CONNECTED_FULL &&
+               conn->state != CS_FIN_SENT) {
+               return 0;
+       }
+
+       // Is this a finalize packet?
+       if (pk_flags == ST_FIN && !conn->got_fin) {
+               LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr);
+               conn->got_fin = true;
+               conn->eof_pkt = pk_seq_nr;
+               // at this point, it is possible for the
+               // other end to have sent packets with
+               // sequence numbers higher than seq_nr.
+               // if this is the case, our reorder_count
+               // is out of sync. This case is dealt with
+               // when we re-order and hit the eof_pkt.
+               // we'll just ignore any packets with
+               // sequence numbers past this
+       }
+
+       // Getting an in-order packet?
+       if (seqnr == 0) {
+               size_t count = packet_end - data;
+               if (count > 0 && conn->state != CS_FIN_SENT) {
+                       LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata));
+                       // Post bytes to the upper layer
+                       conn->func.on_read(conn->userdata, data, count);
+               }
+               conn->ack_nr++;
+               conn->bytes_since_ack += count;
+
+               // Check if the next packet has been received too, but waiting
+               // in the reorder buffer.
+               for (;;) {
+
+                       if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
+                               if (conn->state != CS_FIN_SENT) {
+                                       conn->state = CS_GOT_FIN;
+                                       conn->rto_timeout = g_current_ms + min<uint>(conn->rto * 3, 60);
+
+                                       LOG_UTPV("0x%08x: Posting EOF", conn);
+                                       conn->func.on_state(conn->userdata, UTP_STATE_EOF);
+                               }
+
+                               // if the other end wants to close, ack immediately
+                               conn->send_ack();
+
+                               // reorder_count is not necessarily 0 at this point.
+                               // even though it is most of the time, the other end
+                               // may have sent packets with higher sequence numbers
+                               // than what later end up being eof_pkt
+                               // since we have received all packets up to eof_pkt
+                               // just ignore the ones after it.
+                               conn->reorder_count = 0;
+                       }
+
+                       // Quick get-out in case there is nothing to reorder
+                       if (conn->reorder_count == 0)
+                               break;
+
+                       // Check if there are additional buffers in the reorder buffers
+                       // that need delivery.
+                       byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
+                       if (p == NULL)
+                               break;
+                       conn->inbuf.put(conn->ack_nr+1, NULL);
+                       count = *(uint*)p;
+                       if (count > 0 && conn->state != CS_FIN_SENT) {
+                               // Pass the bytes to the upper layer
+                               conn->func.on_read(conn->userdata, p + sizeof(uint), count);
+                       }
+                       conn->ack_nr++;
+                       conn->bytes_since_ack += count;
+
+                       // Free the element from the reorder buffer
+                       free(p);
+                       assert(conn->reorder_count > 0);
+                       conn->reorder_count--;
+               }
+
+               // start the delayed ACK timer
+               conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
+       } else {
+               // Getting an out of order packet.
+               // The packet needs to be remembered and rearranged later.
+
+               // if we have received a FIN packet, and the EOF-sequence number
+               // is lower than the sequence number of the packet we just received
+               // something is wrong.
+               if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
+                       LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF "
+                               "reorder_count:%u len:%u (rb:%u)",
+                               conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
+                       return 0;
+               }
+
+               // if the sequence number is entirely off the expected
+               // one, just drop it. We can't allocate buffer space in
+               // the inbuf entirely based on untrusted input
+               if (seqnr > 0x3ff) {
+                       LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off "
+                               "reorder_count:%u len:%u (rb:%u)",
+                               conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
+                       return 0;
+               }
+
+               // we need to grow the circle buffer before we
+               // check if the packet is already in here, so that
+               // we don't end up looking at an older packet (since
+               // the indices wraps around).
+               conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
+
+               // Has this packet already been received? (i.e. a duplicate)
+               // If that is the case, just discard it.
+               if (conn->inbuf.get(pk_seq_nr) != NULL) {
+#ifdef _DEBUG
+                       ++conn->_stats._nduprecv;
+#endif
+                       return 0;
+               }
+
+               // Allocate memory to fit the packet that needs to re-ordered
+               byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
+               *(uint*)mem = (uint)(packet_end - data);
+               memcpy(mem + sizeof(uint), data, packet_end - data);
+
+               // Insert into reorder buffer and increment the count
+               // of # of packets to be reordered.
+               // we add one to seqnr in order to leave the last
+               // entry empty, that way the assert in send_ack
+               // is valid. we have to add one to seqnr too, in order
+               // to make the circular buffer grow around the correct
+               // point (which is conn->ack_nr + 1).
+               assert(conn->inbuf.get(pk_seq_nr) == NULL);
+               assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
+               conn->inbuf.put(pk_seq_nr, mem);
+               conn->reorder_count++;
+
+               LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
+                       conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
+
+               // Setup so the partial ACK message will get sent immediately.
+               conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, 1);
+       }
+
+       // If ack_time or ack_bytes indicate that we need to send and ack, send one
+       // here instead of waiting for the timer to trigger
+       LOG_UTPV("bytes_since_ack:%u ack_time:%d",
+                        (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time));
+       if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL) {
+               if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD ||
+                       (int)(g_current_ms - conn->ack_time) >= 0) {
+                       conn->send_ack();
+               }
+       }
+       return (size_t)(packet_end - data);
+}
+
+inline bool UTP_IsV1(PacketFormatV1 const* pf)
+{
+       return pf->version == 1 && pf->type < ST_NUM_STATES && pf->ext < 3;
+}
+
+void UTP_Free(UTPSocket *conn)
+{
+       LOG_UTPV("0x%08x: Killing socket", conn);
+
+       conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING);
+       UTP_SetCallbacks(conn, NULL, NULL);
+
+       assert(conn->idx < g_utp_sockets.GetCount());
+       assert(g_utp_sockets[conn->idx] == conn);
+
+       // Unlink object from the global list
+       assert(g_utp_sockets.GetCount() > 0);
+
+       UTPSocket *last = g_utp_sockets[g_utp_sockets.GetCount() - 1];
+
+       assert(last->idx < g_utp_sockets.GetCount());
+       assert(g_utp_sockets[last->idx] == last);
+
+       last->idx = conn->idx;
+       
+       g_utp_sockets[conn->idx] = last;
+
+       // Decrease the count
+       g_utp_sockets.SetCount(g_utp_sockets.GetCount() - 1);
+
+       // Free all memory occupied by the socket object.
+       for (size_t i = 0; i <= conn->inbuf.mask; i++) {
+               free(conn->inbuf.elements[i]);
+       }
+       for (size_t i = 0; i <= conn->outbuf.mask; i++) {
+               free(conn->outbuf.elements[i]);
+       }
+       free(conn->inbuf.elements);
+       free(conn->outbuf.elements);
+
+       // Finally free the socket object
+       free(conn);
+}
+
+
+// Public functions:
+///////////////////////////////////////////////////////////////////////////////
+
+// Create a UTP socket
+UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen)
+{
+       UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket));
+
+       g_current_ms = UTP_GetMilliseconds();
+
+       UTP_SetCallbacks(conn, NULL, NULL);
+       conn->our_hist.clear();
+       conn->their_hist.clear();
+       conn->rto = 3000;
+       conn->rtt_var = 800;
+       conn->seq_nr = 1;
+       conn->ack_nr = 0;
+       conn->max_window_user = 255 * PACKET_SIZE;
+       conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
+       conn->send_to_proc = send_to_proc;
+       conn->send_to_userdata = send_to_userdata;
+       conn->ack_time = g_current_ms + 0x70000000;
+       conn->last_got_packet = g_current_ms;
+       conn->last_sent_packet = g_current_ms;
+       conn->last_measured_delay = g_current_ms + 0x70000000;
+       conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY;
+       conn->last_send_quota = g_current_ms;
+       conn->send_quota = PACKET_SIZE * 100;
+       conn->cur_window_packets = 0;
+       conn->fast_resend_seq_nr = conn->seq_nr;
+
+       // default to version 1
+       UTP_SetSockopt(conn, SO_UTPVERSION, 1);
+
+       // we need to fit one packet in the window
+       // when we start the connection
+       conn->max_window = conn->get_packet_size();
+       conn->state = CS_IDLE;
+
+       conn->outbuf.mask = 15;
+       conn->inbuf.mask = 15;
+
+       conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
+       conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
+
+       conn->idx = g_utp_sockets.Append(conn);
+
+       LOG_UTPV("0x%08x: UTP_Create", conn);
+
+       return conn;
+}
+
+void UTP_SetCallbacks(UTPSocket *conn, UTPFunctionTable *funcs, void *userdata)
+{
+       assert(conn);
+
+       if (funcs == NULL) {
+               funcs = &zero_funcs;
+       }
+       conn->func = *funcs;
+       conn->userdata = userdata;
+}
+
+bool UTP_SetSockopt(UTPSocket* conn, int opt, int val)
+{
+       assert(conn);
+
+       switch (opt) {
+       case SO_SNDBUF:
+               assert(val >= 1);
+               conn->opt_sndbuf = val;
+               return true;
+       case SO_RCVBUF:
+               conn->opt_rcvbuf = val;
+               return true;
+       case SO_UTPVERSION:
+               assert(conn->state == CS_IDLE);
+               if (conn->state != CS_IDLE) {
+                       // too late
+                       return false;
+               }
+               if (conn->version == 1 && val == 0) {
+                       conn->reply_micro = INT_MAX;
+                       conn->opt_rcvbuf = 200 * 1024;
+                       conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE;
+               } else if (conn->version == 0 && val == 1) {
+                       conn->reply_micro = 0;
+                       conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024;
+                       conn->opt_sndbuf = conn->opt_rcvbuf;
+               }
+               conn->version = val;
+               return true;
+       }
+
+       return false;
+}
+
+// Try to connect to a specified host.
+// 'initial' is the number of data bytes to send in the connect packet.
+void UTP_Connect(UTPSocket *conn)
+{
+       assert(conn);
+
+       assert(conn->state == CS_IDLE);
+       assert(conn->cur_window_packets == 0);
+       assert(conn->outbuf.get(conn->seq_nr) == NULL);
+       assert(sizeof(PacketFormatV1) == 20);
+
+       conn->state = CS_SYN_SENT;
+
+       g_current_ms = UTP_GetMilliseconds();
+
+       // Create and send a connect message
+       uint32 conn_seed = UTP_Random();
+
+       // we identify newer versions by setting the
+       // first two bytes to 0x0001
+       if (conn->version > 0) {
+               conn_seed &= 0xffff;
+       }
+
+       // used in parse_log.py
+       LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) "
+                       "target_delay:%u (ms) delay_history:%u "
+                       "delay_base_history:%u (minutes)",
+                       conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000,
+                       CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
+
+       // Setup initial timeout timer.
+       conn->retransmit_timeout = 3000;
+       conn->rto_timeout = g_current_ms + conn->retransmit_timeout;
+       conn->last_rcv_win = conn->get_rcv_window();
+
+       conn->conn_seed = conn_seed;
+       conn->conn_id_recv = conn_seed;
+       conn->conn_id_send = conn_seed+1;
+       // if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
+       //conn->seq_nr = 1;
+       conn->seq_nr = UTP_Random();
+
+       // Create the connect packet.
+       const size_t header_ext_size = conn->get_header_extensions_size();
+
+       OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_ext_size);
+
+       PacketFormatExtensions* p = (PacketFormatExtensions*)pkt->data;
+       PacketFormatExtensionsV1* p1 = (PacketFormatExtensionsV1*)pkt->data;
+
+       memset(p, 0, header_ext_size);
+       // SYN packets are special, and have the receive ID in the connid field,
+       // instead of conn_id_send.
+       if (conn->version == 0) {
+               p->pf.connid = conn->conn_id_recv;
+               p->pf.ext = 2;
+               p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE);
+               p->pf.seq_nr = conn->seq_nr;
+               p->pf.flags = ST_SYN;
+               p->ext_next = 0;
+               p->ext_len = 8;
+               memset(p->extensions, 0, 8);
+       } else {
+               p1->pf.version = 1;
+               p1->pf.type = ST_SYN;
+               p1->pf.ext = 2;
+               p1->pf.connid = conn->conn_id_recv;
+               p1->pf.windowsize = (uint32)conn->last_rcv_win;
+               p1->pf.seq_nr = conn->seq_nr;
+               p1->ext_next = 0;
+               p1->ext_len = 8;
+               memset(p1->extensions, 0, 8);
+       }
+       pkt->transmissions = 0;
+       pkt->length = header_ext_size;
+       pkt->payload = 0;
+
+       //LOG_UTPV("0x%08x: Sending connect %s [%u].",
+       //               conn, addrfmt(conn->addr, addrbuf), conn_seed);
+
+       // Remember the message in the outgoing queue.
+       conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
+       conn->outbuf.put(conn->seq_nr, pkt);
+       conn->seq_nr++;
+       conn->cur_window_packets++;
+
+       conn->send_packet(pkt);
+}
+
+bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
+                                          SendToProc *send_to_proc, void *send_to_userdata,
+                                          const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
+{
+       const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
+
+       if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) {
+               LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
+               return false;
+       }
+
+       const PacketFormat* p = (PacketFormat*)buffer;
+       const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
+
+       const byte version = UTP_IsV1(p1);
+       const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
+
+       if (version == 0 && len < sizeof(PacketFormat)) {
+               LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
+               return false;
+       }
+
+       if (version == 1 && len < sizeof(PacketFormatV1)) {
+               LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
+               return false;
+       }
+
+       LOG_UTPV("recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
+
+       const PacketFormat *pf = (PacketFormat*)p;
+       const PacketFormatV1 *pf1 = (PacketFormatV1*)p;
+
+       if (version == 0) {
+               LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf->seq_nr, (uint)pf->ack_nr);
+       } else {
+               LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
+       }
+
+       const byte flags = version == 0 ? pf->flags : pf1->type;
+
+       for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) {
+               UTPSocket *conn = g_utp_sockets[i];
+               //LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
+               //              addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id);
+               if (conn->addr != addr)
+                       continue;
+
+               if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) {
+                       LOG_UTPV("0x%08x: recv RST for existing connection", conn);
+                       if (!conn->userdata || conn->state == CS_FIN_SENT) {
+                               conn->state = CS_DESTROY;
+                       } else {
+                               conn->state = CS_RESET;
+                       }
+                       if (conn->userdata) {
+                               conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(),
+                                                                          close_overhead);
+                               const int err = conn->state == CS_SYN_SENT ?
+                                       ECONNREFUSED :
+                                       ECONNRESET;
+                               conn->func.on_error(conn->userdata, err);
+                       }
+                       return true;
+               } else if (flags != ST_SYN && conn->conn_id_recv == id) {
+                       LOG_UTPV("0x%08x: recv processing", conn);
+                       const size_t read = UTP_ProcessIncoming(conn, buffer, len);
+                       if (conn->userdata) {
+                               conn->func.on_overhead(conn->userdata, false,
+                                       (len - read) + conn->get_udp_overhead(),
+                                       header_overhead);
+                       }
+                       return true;
+               }
+       }
+
+       if (flags == ST_RESET) {
+               LOG_UTPV("recv RST for unknown connection");
+               return true;
+       }
+
+       const uint32 seq_nr = version == 0 ? pf->seq_nr : pf1->seq_nr;
+       if (flags != ST_SYN) {
+               for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
+                       if (g_rst_info[i].connid != id)
+                               continue;
+                       if (g_rst_info[i].addr != addr)
+                               continue;
+                       if (seq_nr != g_rst_info[i].ack_nr)
+                               continue;
+                       g_rst_info[i].timestamp = UTP_GetMilliseconds();
+                       LOG_UTPV("recv not sending RST to non-SYN (stored)");
+                       return true;
+               }
+               if (g_rst_info.GetCount() > RST_INFO_LIMIT) {
+                       LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)g_rst_info.GetCount());
+                       return true;
+               }
+               LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)g_rst_info.GetCount());
+               RST_Info &r = g_rst_info.Append();
+               r.addr = addr;
+               r.connid = id;
+               r.ack_nr = seq_nr;
+               r.timestamp = UTP_GetMilliseconds();
+
+               UTPSocket::send_rst(send_to_proc, send_to_userdata, addr, id, seq_nr, UTP_Random(), version);
+               return true;
+       }
+
+       if (incoming_proc) {
+               LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version);
+
+               // Create a new UTP socket to handle this new connection
+               UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen);
+               // Need to track this value to be able to detect duplicate CONNECTs
+               conn->conn_seed = id;
+               // This is value that identifies this connection for them.
+               conn->conn_id_send = id;
+               // This is value that identifies this connection for us.
+               conn->conn_id_recv = id+1;
+               conn->ack_nr = seq_nr;
+               conn->seq_nr = UTP_Random();
+               conn->fast_resend_seq_nr = conn->seq_nr;
+
+               UTP_SetSockopt(conn, SO_UTPVERSION, version);
+               conn->state = CS_CONNECTED;
+
+               const size_t read = UTP_ProcessIncoming(conn, buffer, len, true);
+
+               LOG_UTPV("0x%08x: recv send connect ACK", conn);
+               conn->send_ack(true);
+
+               incoming_proc(send_to_userdata, conn);
+
+               // we report overhead after incoming_proc, because the callbacks are setup now
+               if (conn->userdata) {
+                       // SYN
+                       conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(),
+                                                                  header_overhead);
+                       // SYNACK
+                       conn->func.on_overhead(conn->userdata, true, conn->get_overhead(),
+                                                                  ack_overhead);
+               }
+       }
+
+       return true;
+}
+
+bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
+{
+       const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
+
+       // Want the whole packet so we have connection ID
+       if (len < sizeof(PacketFormat)) {
+               return false;
+       }
+
+       const PacketFormat* p = (PacketFormat*)buffer;
+       const PacketFormatV1* p1 = (PacketFormatV1*)buffer;
+
+       const byte version = UTP_IsV1(p1);
+       const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);
+
+       for (size_t i = 0; i < g_utp_sockets.GetCount(); ++i) {
+               UTPSocket *conn = g_utp_sockets[i];
+               if (conn->addr == addr &&
+                       conn->conn_id_recv == id) {
+                       // Don't pass on errors for idle/closed connections
+                       if (conn->state != CS_IDLE) {
+                               if (!conn->userdata || conn->state == CS_FIN_SENT) {
+                                       LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn);
+                                       conn->state = CS_DESTROY;
+                               } else {
+                                       conn->state = CS_RESET;
+                               }
+                               if (conn->userdata) {
+                                       const int err = conn->state == CS_SYN_SENT ?
+                                               ECONNREFUSED :
+                                               ECONNRESET;
+                                       LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err);
+                                       conn->func.on_error(conn->userdata, err);
+                               }
+                       }
+                       return true;
+               }
+       }
+       return false;
+}
+
+// Write bytes to the UTP socket.
+// Returns true if the socket is still writable.
+bool UTP_Write(UTPSocket *conn, size_t bytes)
+{
+       assert(conn);
+
+#ifdef g_log_utp_verbose
+       size_t param = bytes;
+#endif
+
+       if (conn->state != CS_CONNECTED) {
+               LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes);
+               return false;
+       }
+
+       g_current_ms = UTP_GetMilliseconds();
+
+       conn->update_send_quota();
+
+       // don't send unless it will all fit in the window
+       size_t packet_size = conn->get_packet_size();
+       size_t num_to_send = min<size_t>(bytes, packet_size);
+       while (conn->is_writable(num_to_send)) {
+               // Send an outgoing packet.
+               // Also add it to the outgoing of packets that have been sent but not ACKed.
+
+               if (num_to_send == 0) {
+                       LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param);
+                       return true;
+               }
+               bytes -= num_to_send;
+
+               LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u",
+                                conn, conn->seq_nr, conn->ack_nr,
+                                (uint)(conn->cur_window + num_to_send),
+                                (uint)conn->max_window, (uint)conn->max_window_user,
+                                (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100,
+                                conn->cur_window_packets);
+               conn->write_outgoing_packet(num_to_send, ST_DATA);
+               num_to_send = min<size_t>(bytes, packet_size);
+       }
+
+       // mark the socket as not being writable.
+       conn->state = CS_CONNECTED_FULL;
+       LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes);
+       return false;
+}
+
+void UTP_RBDrained(UTPSocket *conn)
+{
+       assert(conn);
+
+       const size_t rcvwin = conn->get_rcv_window();
+
+       if (rcvwin > conn->last_rcv_win) {
+               // If last window was 0 send ACK immediately, otherwise should set timer
+               if (conn->last_rcv_win == 0) {
+                       conn->send_ack();
+               } else {
+                       conn->ack_time = g_current_ms + min<uint>(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
+               }
+       }
+}
+
+void UTP_CheckTimeouts()
+{
+       g_current_ms = UTP_GetMilliseconds();
+
+       for (size_t i = 0; i < g_rst_info.GetCount(); i++) {
+               if ((int)(g_current_ms - g_rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
+                       g_rst_info.MoveUpLast(i);
+                       i--;
+               }
+       }
+       if (g_rst_info.GetCount() != g_rst_info.GetAlloc()) {
+               g_rst_info.Compact();
+       }
+
+       for (size_t i = 0; i != g_utp_sockets.GetCount(); i++) {
+               UTPSocket *conn = g_utp_sockets[i];
+               conn->check_timeouts();
+
+               // Check if the object was deleted
+               if (conn->state == CS_DESTROY) {
+                       LOG_UTPV("0x%08x: Destroying", conn);
+                       UTP_Free(conn);
+                       i--;
+               }
+       }
+}
+
+size_t UTP_GetPacketSize(UTPSocket *socket)
+{
+       return socket->get_packet_size();
+}
+
+void UTP_GetPeerName(UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen)
+{
+       assert(conn);
+
+       socklen_t len;
+       const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
+       *addrlen = min(len, *addrlen);
+       memcpy(addr, &sa, *addrlen);
+}
+
+void UTP_GetDelays(UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age)
+{
+       assert(conn);
+
+       if (ours) *ours = conn->our_hist.get_value();
+       if (theirs) *theirs = conn->their_hist.get_value();
+       if (age) *age = g_current_ms - conn->last_measured_delay;
+}
+
+#ifdef _DEBUG
+void UTP_GetStats(UTPSocket *conn, UTPStats *stats)
+{
+       assert(conn);
+
+       *stats = conn->_stats;
+}
+#endif // _DEBUG
+
+void UTP_GetGlobalStats(UTPGlobalStats *stats)
+{
+       *stats = _global_stats;
+}
+
+// Close the UTP socket.
+// It is not valid for the upper layer to refer to socket after it is closed.
+// Data will keep to try being delivered after the close.
+void UTP_Close(UTPSocket *conn)
+{
+       assert(conn);
+
+       assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY);
+
+       LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]);
+
+       switch(conn->state) {
+       case CS_CONNECTED:
+       case CS_CONNECTED_FULL:
+               conn->state = CS_FIN_SENT;
+               conn->write_outgoing_packet(0, ST_FIN);
+               break;
+
+       case CS_SYN_SENT:
+               conn->rto_timeout = UTP_GetMilliseconds() + min<uint>(conn->rto * 2, 60);
+       case CS_GOT_FIN:
+               conn->state = CS_DESTROY_DELAY;
+               break;
+
+       default:
+               conn->state = CS_DESTROY;
+               break;
+       }
+}
diff --git a/third-party/libutp/utp.h b/third-party/libutp/utp.h
new file mode 100644 (file)
index 0000000..9c1c6f9
--- /dev/null
@@ -0,0 +1,163 @@
+#ifndef __UTP_H__
+#define __UTP_H__
+
+#include "utypes.h"
+
+#ifdef WIN32
+#define _CRT_SECURE_NO_DEPRECATE
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#pragma comment(lib,"ws2_32.lib")
+#else
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct UTPSocket;
+
+// Used to set sockopt on a uTP socket to set the version of uTP
+// to use for outgoing connections. This can only be called before
+// the uTP socket is connected
+#define SO_UTPVERSION 99
+
+enum {
+       // socket has reveived syn-ack (notification only for outgoing connection completion)
+       // this implies writability
+       UTP_STATE_CONNECT = 1,
+
+       // socket is able to send more data
+       UTP_STATE_WRITABLE = 2,
+
+       // connection closed
+       UTP_STATE_EOF = 3,
+
+       // socket is being destroyed, meaning all data has been sent if possible.
+       // it is not valid to refer to the socket after this state change occurs
+       UTP_STATE_DESTROYING = 4,
+};
+
+// Callbacks called by a uTP socket (register with UTP_SetCallbacks)
+
+// The uTP socket layer calls this when bytes have been received from the network.
+typedef void UTPOnReadProc(void *userdata, const byte *bytes, size_t count);
+
+// The uTP socket layer calls this to fill the outgoing buffer with bytes.
+// The uTP layer takes responsibility that those bytes will be delivered.
+typedef void UTPOnWriteProc(void *userdata, byte *bytes, size_t count);
+
+// The uTP socket layer calls this to retrieve number of bytes currently in read buffer
+typedef size_t UTPGetRBSize(void *userdata);
+
+// The uTP socket layer calls this whenever the socket becomes writable.
+typedef void UTPOnStateChangeProc(void *userdata, int state);
+
+// The uTP socket layer calls this when an error occurs on the socket.
+// These errors currently include ECONNREFUSED, ECONNRESET and ETIMEDOUT, but
+// could eventually include any BSD socket error.
+typedef void UTPOnErrorProc(void *userdata, int errcode);
+
+// The uTP socket layer calls this to report overhead statistics
+typedef void UTPOnOverheadProc(void *userdata, bool send, size_t count, int type);
+
+struct UTPFunctionTable {
+       UTPOnReadProc *on_read;
+       UTPOnWriteProc *on_write;
+       UTPGetRBSize *get_rb_size;
+       UTPOnStateChangeProc *on_state;
+       UTPOnErrorProc *on_error;
+       UTPOnOverheadProc *on_overhead;
+};
+
+
+// The uTP socket layer calls this when a new incoming uTP connection is established
+// this implies writability
+typedef void UTPGotIncomingConnection(void *userdata, struct UTPSocket* s);
+
+// The uTP socket layer calls this to send UDP packets
+typedef void SendToProc(void *userdata, const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen);
+
+
+// Functions which can be called with a uTP socket
+
+// Create a uTP socket
+struct UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata,
+                                         const struct sockaddr *addr, socklen_t addrlen);
+
+// Setup the callbacks - must be done before connect or on incoming connection
+void UTP_SetCallbacks(struct UTPSocket *socket, struct UTPFunctionTable *func, void *userdata);
+
+// Valid options include SO_SNDBUF, SO_RCVBUF and SO_UTPVERSION
+bool UTP_SetSockopt(struct UTPSocket *socket, int opt, int val);
+
+// Try to connect to a specified host.
+void UTP_Connect(struct UTPSocket *socket);
+
+// Process a UDP packet from the network. This will process a packet for an existing connection,
+// or create a new connection and call incoming_proc. Returns true if the packet was processed
+// in some way, false if the packet did not appear to be uTP.
+bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
+                                          SendToProc *send_to_proc, void *send_to_userdata,
+                                          const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen);
+
+// Process an ICMP received UDP packet.
+bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen);
+
+// Write bytes to the uTP socket.
+// Returns true if the socket is still writable.
+bool UTP_Write(struct UTPSocket *socket, size_t count);
+
+// Notify the uTP socket of buffer drain
+void UTP_RBDrained(struct UTPSocket *socket);
+
+// Call periodically to process timeouts and other periodic events
+void UTP_CheckTimeouts(void);
+
+// Retrieves the peer address of the specified socket, stores this address in the
+// sockaddr structure pointed to by the addr argument, and stores the length of this
+// address in the object pointed to by the addrlen argument.
+void UTP_GetPeerName(struct UTPSocket *socket, struct sockaddr *addr, socklen_t *addrlen);
+
+void UTP_GetDelays(struct UTPSocket *socket, int32 *ours, int32 *theirs, uint32 *age);
+
+size_t UTP_GetPacketSize(struct UTPSocket *socket);
+
+#ifdef _DEBUG
+struct UTPStats {
+       uint64 _nbytes_recv;    // total bytes received
+       uint64 _nbytes_xmit;    // total bytes transmitted
+       uint32 _rexmit;         // retransmit counter
+       uint32 _fastrexmit;     // fast retransmit counter
+       uint32 _nxmit;          // transmit counter
+       uint32 _nrecv;          // receive counter (total)
+       uint32 _nduprecv;       // duplicate receive counter
+};
+
+// Get stats for UTP socket
+void UTP_GetStats(struct UTPSocket *socket, UTPStats *stats);
+#endif
+
+// Close the UTP socket.
+// It is not valid to issue commands for this socket after it is closed.
+// This does not actually destroy the socket until outstanding data is sent, at which
+// point the socket will change to the UTP_STATE_DESTROYING state.
+void UTP_Close(struct UTPSocket *socket);
+
+struct UTPGlobalStats {
+       uint32 _nraw_recv[5];   // total packets recieved less than 300/600/1200/MTU bytes fpr all connections (global)
+       uint32 _nraw_send[5];   // total packets sent less than 300/600/1200/MTU bytes for all connections (global)
+};
+
+void UTP_GetGlobalStats(struct UTPGlobalStats *stats);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //__UTP_H__
diff --git a/third-party/libutp/utp_config.h b/third-party/libutp/utp_config.h
new file mode 100644 (file)
index 0000000..bc804ac
--- /dev/null
@@ -0,0 +1,37 @@
+#define CCONTROL_TARGET (100 * 1000) // us
+#define RATE_CHECK_INTERVAL 10000 // ms
+#define DYNAMIC_PACKET_SIZE_ENABLED false
+#define DYNAMIC_PACKET_SIZE_FACTOR 2
+// This should return the global number of bytes sent, used for determining dynamic
+// packet size based on rate
+uint64 UTP_GetGlobalUTPBytesSent(const struct sockaddr *remote, socklen_t remotelen) { return 0; }
+
+enum bandwidth_type_t {
+       payload_bandwidth, connect_overhead,
+       close_overhead, ack_overhead,
+       header_overhead, retransmit_overhead
+};
+
+#ifdef WIN32
+#define I64u "%I64u"
+#else
+#define I64u "%Lu"
+#endif
+#ifdef WIN32
+#define snprintf _snprintf
+#endif
+
+#define g_log_utp 0
+#define g_log_utp_verbose 0
+void utp_log(char const* fmt, ...)
+{
+       /*
+       printf("[%u] ", UTP_GetMilliseconds());
+       va_list vl;
+       va_start(vl, fmt);
+       vprintf(fmt, vl);
+       va_end(vl);
+       puts("");
+       fflush(stdout);
+       */
+};
diff --git a/third-party/libutp/utp_config_example.h b/third-party/libutp/utp_config_example.h
new file mode 100644 (file)
index 0000000..9573f8b
--- /dev/null
@@ -0,0 +1,26 @@
+#define CCONTROL_TARGET (100 * 1000) // us
+#define RATE_CHECK_INTERVAL 10000 // ms
+#define DYNAMIC_PACKET_SIZE_ENABLED false
+#define DYNAMIC_PACKET_SIZE_FACTOR 2
+// This should return the global number of bytes sent, used for determining dynamic
+// packet size based on rate
+uint64 UTP_GetGlobalUTPBytesSent(const struct sockaddr *remote, socklen_t remotelen) { return 0; }
+
+enum bandwidth_type_t {
+       payload_bandwidth, connect_overhead,
+       close_overhead, ack_overhead,
+       header_overhead, retransmit_overhead
+};
+
+#ifdef WIN32
+#define I64u "%I64u"
+#else
+#define I64u "%Lu"
+#endif
+#ifdef WIN32
+#define snprintf _snprintf
+#endif
+
+#define g_log_utp 0
+#define g_log_utp_verbose 0
+void utp_log(char const* fmt, ...);
diff --git a/third-party/libutp/utp_utils.cpp b/third-party/libutp/utp_utils.cpp
new file mode 100644 (file)
index 0000000..235303a
--- /dev/null
@@ -0,0 +1,208 @@
+#include "StdAfx.h"
+
+#include "utypes.h"
+#include <assert.h>
+#include <stdlib.h>
+
+#ifdef WIN32
+
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+typedef ULONGLONG (WINAPI GetTickCount64Proc)(void);
+static GetTickCount64Proc *pt2GetTickCount64;
+static GetTickCount64Proc *pt2RealGetTickCount;
+
+static uint64 startPerformanceCounter;
+static uint64 startGetTickCount;
+// MSVC 6 standard doesn't like division with uint64s
+static double counterPerMicrosecond;
+
+uint64 UTGetTickCount64()
+{
+       if (pt2GetTickCount64) {
+               return pt2GetTickCount64();
+       }
+       if (pt2RealGetTickCount) {
+               uint64 v = pt2RealGetTickCount();
+               // fix return value from GetTickCount
+               return (DWORD)v | ((v >> 0x18) & 0xFFFFFFFF00000000);
+       }
+       return (uint64)GetTickCount();
+}
+
+uint32 UTP_GetMilliseconds()
+{
+       return GetTickCount();
+}
+
+void Time_Initialize()
+{
+       HMODULE kernel32 = GetModuleHandleA("kernel32.dll");
+       pt2GetTickCount64 = (GetTickCount64Proc*)GetProcAddress(kernel32, "GetTickCount64");
+       // not a typo. GetTickCount actually returns 64 bits
+       pt2RealGetTickCount = (GetTickCount64Proc*)GetProcAddress(kernel32, "GetTickCount");
+
+       uint64 frequency;
+       QueryPerformanceCounter((LARGE_INTEGER*)&startPerformanceCounter);
+       QueryPerformanceFrequency((LARGE_INTEGER*)&frequency);
+       counterPerMicrosecond = (double)frequency / 1000000.0f;
+       startGetTickCount = UTGetTickCount64();
+}
+
+int64 abs64(int64 x) { return x < 0 ? -x : x; }
+
+uint64 UTP_GetMicroseconds()
+{
+       static bool time_init = false;
+       if (!time_init) {
+               time_init = true;
+               Time_Initialize();
+       }
+
+       uint64 counter;
+       uint64 tick;
+
+       QueryPerformanceCounter((LARGE_INTEGER*) &counter);
+       tick = UTGetTickCount64();
+
+       // unfortunately, QueryPerformanceCounter is not guaranteed
+       // to be monotonic. Make it so.
+       int64 ret = (int64)(((int64)counter - (int64)startPerformanceCounter) / counterPerMicrosecond);
+       // if the QPC clock leaps more than one second off GetTickCount64()
+       // something is seriously fishy. Adjust QPC to stay monotonic
+       int64 tick_diff = tick - startGetTickCount;
+       if (abs64(ret / 100000 - tick_diff / 100) > 10) {
+               startPerformanceCounter -= (uint64)((int64)(tick_diff * 1000 - ret) * counterPerMicrosecond);
+               ret = (int64)((counter - startPerformanceCounter) / counterPerMicrosecond);
+       }
+       return ret;
+}
+
+#else //!WIN32
+
+#include <time.h>
+#include <sys/time.h>          // Linux needs both time.h and sys/time.h
+#include <stdlib.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#if defined(__APPLE__)
+#include <mach/mach_time.h>
+
+uint64 UTP_GetMicroseconds()
+{
+       // http://developer.apple.com/mac/library/qa/qa2004/qa1398.html
+       // http://www.macresearch.org/tutorial_performance_and_time
+       static mach_timebase_info_data_t sTimebaseInfo;
+       static uint64_t start_tick = 0;
+       uint64_t tick;
+       // Returns a counter in some fraction of a nanoseconds
+       tick = mach_absolute_time();  
+       if (sTimebaseInfo.denom == 0) {
+               // Get the timer ratio to convert mach_absolute_time to nanoseconds
+               mach_timebase_info(&sTimebaseInfo); 
+               start_tick = tick;
+       }
+       // Calculate the elapsed time, convert it to microseconds and return it.
+       return ((tick - start_tick) * sTimebaseInfo.numer) / (sTimebaseInfo.denom * 1000);
+}
+
+#else
+
+/* Unfortunately, #ifdef CLOCK_MONOTONIC is not enough to make sure that
+   POSIX clocks work -- we could be running a recent libc with an ancient
+   kernel (think OpenWRT). -- jch */
+
+uint64 UTP_GetMicroseconds()
+{
+       static int have_posix_clocks = -1;
+       int rc;
+
+#if defined(_POSIX_TIMERS) && _POSIX_TIMERS > 0 && defined(CLOCK_MONOTONIC)
+       if (have_posix_clocks < 0) {
+               struct timespec ts;
+               rc = clock_gettime(CLOCK_MONOTONIC, &ts);
+               if (rc < 0) {
+                       have_posix_clocks = 0;
+               } else {
+                       have_posix_clocks = 1;
+               }
+       }
+
+       if (have_posix_clocks) {
+               struct timespec ts;
+               rc = clock_gettime(CLOCK_MONOTONIC, &ts);
+               return uint64(ts.tv_sec) * 1000000 + ts.tv_nsec / 1000;
+       }
+#endif
+       {
+               static time_t offset = 0, previous = 0;
+               struct timeval tv;
+               rc = gettimeofday(&tv, NULL);
+               tv.tv_sec += offset;
+               if (previous > tv.tv_sec) {
+                       offset += previous - tv.tv_sec;
+                       tv.tv_sec = previous;
+               }
+               previous = tv.tv_sec;
+               return uint64(tv.tv_sec) * 1000000 + tv.tv_usec;
+       }
+}
+#endif
+
+uint32 UTP_GetMilliseconds()
+{
+       return UTP_GetMicroseconds() / 1000;
+}
+
+#endif
+
+
+#define ETHERNET_MTU 1500
+#define IPV4_HEADER_SIZE 20
+#define IPV6_HEADER_SIZE 40
+#define UDP_HEADER_SIZE 8
+#define GRE_HEADER_SIZE 24
+#define PPPOE_HEADER_SIZE 8
+#define MPPE_HEADER_SIZE 2
+// packets have been observed in the wild that were fragmented
+// with a payload of 1416 for the first fragment
+// There are reports of routers that have MTU sizes as small as 1392
+#define FUDGE_HEADER_SIZE 36
+#define TEREDO_MTU 1280
+
+#define UDP_IPV4_OVERHEAD (IPV4_HEADER_SIZE + UDP_HEADER_SIZE)
+#define UDP_IPV6_OVERHEAD (IPV6_HEADER_SIZE + UDP_HEADER_SIZE)
+#define UDP_TEREDO_OVERHEAD (UDP_IPV4_OVERHEAD + UDP_IPV6_OVERHEAD)
+
+#define UDP_IPV4_MTU (ETHERNET_MTU - IPV4_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE - PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE)
+#define UDP_IPV6_MTU (ETHERNET_MTU - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE - PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE)
+#define UDP_TEREDO_MTU (TEREDO_MTU - UDP_HEADER_SIZE)
+
+uint16 UTP_GetUDPMTU(const struct sockaddr *remote, socklen_t remotelen)
+{
+       // Since we don't know the local address of the interface,
+       // be conservative and assume all IPv6 connections are Teredo.
+       return remote->sa_family == AF_INET6 ? UDP_TEREDO_MTU : UDP_IPV4_MTU;
+}
+
+uint16 UTP_GetUDPOverhead(const struct sockaddr *remote, socklen_t remotelen)
+{
+       // Since we don't know the local address of the interface,
+       // be conservative and assume all IPv6 connections are Teredo.
+       return remote->sa_family == AF_INET6 ? UDP_TEREDO_OVERHEAD : UDP_IPV4_OVERHEAD;
+}
+
+uint32 UTP_Random()
+{
+       return rand();
+}
+
+void UTP_DelaySample(const struct sockaddr *remote, int sample_ms) {}
+size_t UTP_GetPacketSize(const struct sockaddr *remote) { return 1500; }
+
diff --git a/third-party/libutp/utp_utils.h b/third-party/libutp/utp_utils.h
new file mode 100644 (file)
index 0000000..585a7e2
--- /dev/null
@@ -0,0 +1,16 @@
+// This should return the MTU to the destination
+uint16 UTP_GetUDPMTU(const struct sockaddr *remote, socklen_t remotelen);
+// This should return the number of bytes of UDP overhead for one packet to the
+// destination, for overhead calculation only
+uint16 UTP_GetUDPOverhead(const struct sockaddr *remote, socklen_t remotelen);
+// This should return monotonically increasing milliseconds, start point does not matter
+uint32 UTP_GetMilliseconds();
+// This should return monotonically increasing microseconds, start point does not matter
+uint64 UTP_GetMicroseconds();
+// This should return a random uint32
+uint32 UTP_Random();
+// This is called every time we have a delay sample is made
+void UTP_DelaySample(const struct sockaddr *remote, int sample_ms);
+// Should return the max packet size to use when sending to the given address
+size_t UTP_GetPacketSize(const struct sockaddr *remote);
+
diff --git a/third-party/libutp/utypes.h b/third-party/libutp/utypes.h
new file mode 100644 (file)
index 0000000..673554e
--- /dev/null
@@ -0,0 +1,42 @@
+#ifndef __UTYPES_H__
+#define __UTYPES_H__
+
+// standard types
+typedef unsigned char byte;
+typedef unsigned char uint8;
+typedef signed char int8;
+typedef unsigned short uint16;
+typedef signed short int16;
+typedef unsigned int uint;
+typedef unsigned int uint32;
+typedef signed int int32;
+
+#ifdef _MSC_VER
+typedef unsigned __int64 uint64;
+typedef signed __int64 int64;
+#else
+typedef unsigned long long uint64;
+typedef long long int64;
+#endif
+
+/* compile-time assert */
+#ifndef CASSERT
+#define CASSERT( exp, name ) typedef int is_not_##name [ (exp ) ? 1 : -1 ];
+#endif
+
+CASSERT(8 == sizeof(uint64), sizeof_uint64_is_8)
+CASSERT(8 == sizeof(int64), sizeof_int64_is_8)
+
+#ifndef INT64_MAX
+#define INT64_MAX 0x7fffffffffffffffLL
+#endif
+
+// always ANSI
+typedef const char * cstr;
+typedef char * str;
+
+#ifndef __cplusplus
+typedef uint8 bool;
+#endif
+
+#endif //__UTYPES_H__