twister-core/libtorrent/src/utp_stream.cpp
2013-10-15 23:21:08 -03:00

3357 lines
96 KiB
C++

/*
Copyright (c) 2009-2012, Arvid Norberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include "libtorrent/config.hpp"
#include "libtorrent/utp_stream.hpp"
#include "libtorrent/sliding_average.hpp"
#include "libtorrent/utp_socket_manager.hpp"
#include "libtorrent/alloca.hpp"
#include "libtorrent/timestamp_history.hpp"
#include "libtorrent/error.hpp"
#include "libtorrent/random.hpp"
#include "libtorrent/invariant_check.hpp"
#include <boost/cstdint.hpp>
#include <limits>
#define TORRENT_UTP_LOG 0
#define TORRENT_VERBOSE_UTP_LOG 0
#define TORRENT_UT_SEQ 1
#if TORRENT_UTP_LOG
#include <stdarg.h>
#include "libtorrent/socket_io.hpp"
#endif
namespace libtorrent {
#if TORRENT_UTP_LOG
char const* packet_type_names[] = { "ST_DATA", "ST_FIN", "ST_STATE", "ST_RESET", "ST_SYN" };
char const* socket_state_names[] = { "NONE", "SYN_SENT", "CONNECTED", "FIN_SENT", "ERROR", "DELETE" };
static struct utp_logger
{
FILE* utp_log_file;
mutex utp_log_mutex;
utp_logger() : utp_log_file(0)
{
utp_log_file = fopen("utp.log", "w+");
}
~utp_logger()
{
if (utp_log_file) fclose(utp_log_file);
}
} log_file_holder;
void utp_log(char const* fmt, ...)
{
mutex::scoped_lock lock(log_file_holder.utp_log_mutex);
static ptime start = time_now_hires();
fprintf(log_file_holder.utp_log_file, "[%012"PRId64"] ", total_microseconds(time_now_hires() - start));
va_list l;
va_start(l, fmt);
vfprintf(log_file_holder.utp_log_file, fmt, l);
va_end(l);
}
#define UTP_LOG utp_log
#if TORRENT_VERBOSE_UTP_LOG
#define UTP_LOGV utp_log
#else
#define UTP_LOGV if (false) printf
#endif
#else
#define UTP_LOG if (false) printf
#define UTP_LOGV if (false) printf
#endif
enum
{
ACK_MASK = 0xffff,
// the number of packets that'll fit in the reorder buffer
max_packets_reorder = 512,
// if a packet receives more than this number of
// duplicate acks, we'll trigger a fast re-send
dup_ack_limit = 3,
// the max number of packets to fast-resend per
// selective ack message
sack_resend_limit = 3,
};
// compare if lhs is less than rhs, taking wrapping
// into account. if lhs is close to UINT_MAX and rhs
// is close to 0, lhs is assumed to have wrapped and
// considered smaller
TORRENT_EXTRA_EXPORT bool compare_less_wrap(boost::uint32_t lhs, boost::uint32_t rhs, boost::uint32_t mask)
{
// distance walking from lhs to rhs, downwards
boost::uint32_t dist_down = (lhs - rhs) & mask;
// distance walking from lhs to rhs, upwards
boost::uint32_t dist_up = (rhs - lhs) & mask;
// if the distance walking up is shorter, lhs
// is less than rhs. If the distance walking down
// is shorter, then rhs is less than lhs
return dist_up < dist_down;
}
// used for out-of-order incoming packets
// as well as sent packets that are waiting to be ACKed
struct packet
{
// the last time this packet was sent
ptime send_time;
// the number of bytes actually allocated in 'buf'
boost::uint16_t allocated;
// the size of the buffer 'buf' points to
boost::uint16_t size;
// this is the offset to the payload inside the buffer
// this is also used as a cursor to describe where the
// next payload that hasn't been consumed yet starts
boost::uint16_t header_size;
// the number of times this packet has been sent
boost::uint8_t num_transmissions:6;
// true if we need to send this packet again. All
// outstanding packets are marked as needing to be
// resent on timeouts
bool need_resend:1;
// this is set to true for packets that were
// sent with the DF bit set (Don't Fragment)
bool mtu_probe:1;
// the actual packet buffer
boost::uint8_t buf[];
};
// since the uTP socket state may be needed after the
// utp_stream is closed, it's kept in a separate struct
// whose lifetime is not tied to the lifetime of utp_stream
// the utp socket is closely modelled after the asio async
// operations and handler model. For writing to the socket,
// the client provides a list of buffers (for gather/writev
// style of I/O) and whenever the socket can write another
// packet to the stream, it picks up data from these buffers.
// When all of the data has been written, or enough time has
// passed since we first started writing, the write handler
// is called and the write buffer is reset. This means that
// we're not writing anything at all while waiting for the
// client to re-issue a write request.
// reading is a little bit more complicated, since we must
// be able to receive data even when the user doesn't have
// an outstanding read operation on the socket. When the user
// does however, we want to receive data directly into the
// user's buffer instead of first copying it into our receive
// buffer. This is why the receive case is more complicated.
// There are two receive buffers. One provided by the user,
// which when present is always used. The other one is used
// when the user doesn't have an outstanding read request,
// and hence hasn't provided any buffer space to receive into.
// the user provided read buffer is called "m_read_buffer" and
// its size is "m_read_buffer_size". The buffer we spill over
// into when the user provided buffer is full or when there
// is none, is "m_receive_buffer" and "m_receive_buffer_size"
// respectively.
// in order to know when to trigger the read and write handlers
// there are two counters, m_read and m_written, which count
// the number of bytes we've stuffed into the user provided
// read buffer or written to the stream from the write buffer.
// These are used to trigger the handlers if we're written a
// large number of bytes. It's also triggered if we're filled
// the whole read buffer, or written the entire write buffer.
// The last way the handlers can be triggered is if we're read
// or written some, and enough time has elapsed since then.
// when we receive data into m_receive_buffer (i.e. the buffer
// used when there's no user provided one) is stored as a
// number of heap allocated packets. This is just because it's
// simple to reuse the data structured and it provides all the
// functionality needed for this buffer.
struct utp_socket_impl
{
utp_socket_impl(boost::uint16_t recv_id, boost::uint16_t send_id
, void* userdata, utp_socket_manager* sm)
: m_sm(sm)
, m_userdata(userdata)
, m_nagle_packet(NULL)
, m_read_handler(0)
, m_write_handler(0)
, m_connect_handler(0)
, m_remote_address()
, m_timeout(time_now_hires() + milliseconds(m_sm->connect_timeout()))
, m_last_cwnd_hit(time_now())
, m_last_history_step(time_now_hires())
, m_cwnd(TORRENT_ETHERNET_MTU << 16)
, m_buffered_incoming_bytes(0)
, m_reply_micro(0)
, m_adv_wnd(TORRENT_ETHERNET_MTU)
, m_bytes_in_flight(0)
, m_read(0)
, m_write_buffer_size(0)
, m_written(0)
, m_receive_buffer_size(0)
, m_read_buffer_size(0)
, m_in_buf_size(100 * 1024 * 1024)
, m_in_packets(0)
, m_out_packets(0)
, m_send_delay(0)
, m_recv_delay(0)
, m_port(0)
, m_send_id(send_id)
, m_recv_id(recv_id)
, m_ack_nr(0)
, m_seq_nr(0)
, m_acked_seq_nr(0)
, m_fast_resend_seq_nr(0)
, m_eof_seq_nr(0)
, m_loss_seq_nr(0)
, m_mtu(TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER - 8 - 24 - 36)
, m_mtu_floor(TORRENT_INET_MIN_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)
, m_mtu_ceiling(TORRENT_ETHERNET_MTU - TORRENT_IPV4_HEADER - TORRENT_UDP_HEADER)
, m_mtu_seq(0)
, m_duplicate_acks(0)
, m_num_timeouts(0)
, m_delay_sample_idx(0)
, m_state(UTP_STATE_NONE)
, m_eof(false)
, m_attached(true)
, m_nagle(true)
, m_slow_start(true)
, m_cwnd_full(false)
, m_deferred_ack(false)
, m_subscribe_drained(false)
, m_stalled(false)
{
TORRENT_ASSERT(m_userdata);
for (int i = 0; i != num_delay_hist; ++i)
m_delay_sample_hist[i] = UINT_MAX;
}
~utp_socket_impl();
void tick(ptime const& now);
void init_mtu(int link_mtu, int utp_mtu);
bool incoming_packet(boost::uint8_t const* buf, int size
, udp::endpoint const& ep, ptime receive_time);
void writable();
bool should_delete() const;
tcp::endpoint remote_endpoint(error_code& ec) const
{
if (m_state == UTP_STATE_NONE)
ec = asio::error::not_connected;
else
TORRENT_ASSERT(m_remote_address != address_v4::any());
return tcp::endpoint(m_remote_address, m_port);
}
std::size_t available() const;
// returns true if there were handlers cancelled
// if it returns false, we can detach immediately
bool destroy();
void detach();
void send_syn();
void send_fin();
void subscribe_drained();
void defer_ack();
void remove_sack_header(packet* p);
enum packet_flags_t { pkt_ack = 1, pkt_fin = 2 };
bool send_pkt(int flags = 0);
bool resend_packet(packet* p, bool fast_resend = false);
void send_reset(utp_header* ph);
void parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr, int size, int* acked_bytes
, ptime const now, boost::uint32_t& min_rtt);
void write_payload(boost::uint8_t* ptr, int size);
void maybe_inc_acked_seq_nr();
void ack_packet(packet* p, ptime const& receive_time
, boost::uint32_t& min_rtt, boost::uint16_t seq_nr);
void write_sack(boost::uint8_t* buf, int size) const;
void incoming(boost::uint8_t const* buf, int size, packet* p, ptime now);
void do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now);
int packet_timeout() const;
bool test_socket_state();
void maybe_trigger_receive_callback();
void maybe_trigger_send_callback();
bool cancel_handlers(error_code const& ec, bool kill);
bool consume_incoming_data(
utp_header const* ph, boost::uint8_t const* ptr, int payload_size, ptime now);
void update_mtu_limits();
void experienced_loss(int seq_nr);
void check_receive_buffers() const;
#if defined TORRENT_DEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
void check_invariant() const;
#endif
utp_socket_manager* m_sm;
// userdata pointer passed along
// with any callback. This is initialized to 0
// then set to point to the utp_stream when
// hooked up, and then reset to 0 once the utp_stream
// detaches. This is used to know whether or not
// the socket impl is still attached to a utp_stream
// object. When it isn't, we'll never be able to
// signal anything back to the client, and in case
// of errors, we just have to delete ourselves
// i.e. transition to the UTP_STATE_DELETED state
void* m_userdata;
// This is a platform-independent replacement
// for the regular iovec type in posix. Since
// it's not used in any system call, we might as
// well define our own type instead of wrapping
// the system's type.
struct iovec_t
{
iovec_t(void* b, size_t l): buf(b), len(l) {}
void* buf;
size_t len;
};
// if there's currently an async read or write
// operation in progress, these buffers are initialized
// and used, otherwise any bytes received are stuck in
// m_receive_buffer until another read is made
// as we flush from the write buffer, individual iovecs
// are updated to only refer to unflushed portions of the
// buffers. Buffers that empty are erased from the vector.
std::vector<iovec_t> m_write_buffer;
// if this is non NULL, it's a packet. This packet was held off because
// of NAGLE. We couldn't send it immediately. It's left
// here to accrue more bytes before we send it.
packet* m_nagle_packet;
// the user provided read buffer. If this has a size greater
// than 0, we'll always prefer using it over putting received
// data in the m_receive_buffer. As data is stored in the
// read buffer, the iovec_t elements are adjusted to only
// refer to the unwritten portions of the buffers, and the
// ones that fill up are erased from the vector
std::vector<iovec_t> m_read_buffer;
// packets we've received without a read operation
// active. Store them here until the client triggers
// an async_read_some
std::vector<packet*> m_receive_buffer;
// this is the error on this socket. If m_state is
// set to UTP_STATE_ERROR_WAIT, this error should be
// forwarded to the client as soon as we have a new
// async operation initiated
error_code m_error;
// these are the callbacks made into the utp_stream object
// on read/write/connect events
utp_stream::handler_t m_read_handler;
utp_stream::handler_t m_write_handler;
utp_stream::connect_handler_t m_connect_handler;
// the address of the remote endpoint
address m_remote_address;
// the local address
address m_local_address;
// the send and receive buffers
// maps packet sequence numbers
packet_buffer m_inbuf;
packet_buffer m_outbuf;
// the time when the last packet we sent times out. Including re-sends.
// if we ever end up not having sent anything in one second (
// or one mean rtt + 2 average deviations, whichever is greater)
// we set our cwnd to 1 MSS. This condition can happen either because
// a packet has timed out and needs to be resent or because our
// cwnd is set to less than one MSS during congestion control.
// it can also happen if the other end sends an advertized window
// size less than one MSS.
ptime m_timeout;
// the last time we wanted to send more data, but couldn't because
// it would bring the number of outstanding bytes above the cwnd.
// this is used to restrict increasing the cwnd size when we're
// not sending fast enough to need it bigger
ptime m_last_cwnd_hit;
// the last time we stepped the timestamp history
ptime m_last_history_step;
// the max number of bytes in-flight. This is a fixed point
// value, to get the true number of bytes, shift right 16 bits
// the value is always >= 0, but the calculations performed on
// it in do_ledbat() are signed.
boost::int64_t m_cwnd;
timestamp_history m_delay_hist;
timestamp_history m_their_delay_hist;
// the number of bytes we have buffered in m_inbuf
boost::int32_t m_buffered_incoming_bytes;
// the timestamp diff in the last packet received
// this is what we'll send back
boost::uint32_t m_reply_micro;
// this is the advertized receive window the other end sent
// we'll never have more un-acked bytes in flight
// if this ever gets set to zero, we'll try one packet every
// second until the window opens up again
boost::uint32_t m_adv_wnd;
// the number of un-acked bytes we have sent
boost::int32_t m_bytes_in_flight;
// the number of bytes read into the user provided
// buffer. If this grows too big, we'll trigger the
// read handler.
boost::int32_t m_read;
// the sum of the lengths of all iovec in m_write_buffer
boost::int32_t m_write_buffer_size;
// the number of bytes already written to packets
// from m_write_buffer
boost::int32_t m_written;
// the sum of all packets stored in m_receive_buffer
boost::int32_t m_receive_buffer_size;
// the sum of all buffers in m_read_buffer
boost::int32_t m_read_buffer_size;
// max number of bytes to allocate for receive buffer
boost::int32_t m_in_buf_size;
// this holds the 3 last delay measurements,
// these are the actual corrected delay measurements.
// the lowest of the 3 last ones is used in the congestion
// controller. This is to not completely close the cwnd
// by a single outlier.
enum { num_delay_hist = 3 };
boost::uint32_t m_delay_sample_hist[num_delay_hist];
// counters
boost::uint32_t m_in_packets;
boost::uint32_t m_out_packets;
// the last send delay sample
boost::int32_t m_send_delay;
// the last receive delay sample
boost::int32_t m_recv_delay;
// average RTT
sliding_average<16> m_rtt;
// port of destination endpoint
boost::uint16_t m_port;
boost::uint16_t m_send_id;
boost::uint16_t m_recv_id;
// this is the ack we're sending back. We have
// received all packets up to this sequence number
boost::uint16_t m_ack_nr;
// the sequence number of the next packet
// we'll send
boost::uint16_t m_seq_nr;
// this is the sequence number of the packet that
// everything has been ACKed up to. Everything we've
// sent up to this point has been received by the other
// end.
boost::uint16_t m_acked_seq_nr;
// each packet gets one chance of "fast resend". i.e.
// if we have multiple duplicate acks, we may send a
// packet immediately, if m_fast_resend_seq_nr is set
// to that packet's sequence number
boost::uint16_t m_fast_resend_seq_nr;
// this is the sequence number of the FIN packet
// we've received. This sequence number is only
// valid if m_eof is true. We should not accept
// any packets beyond this sequence number from the
// other end
boost::uint16_t m_eof_seq_nr;
// this is the lowest sequence number that, when lost,
// will cause the window size to be cut in half
boost::uint16_t m_loss_seq_nr;
// the max number of bytes we can send in a packet
// including the header
boost::uint16_t m_mtu;
// the floor is the largest packet that we have
// been able to get through without fragmentation
boost::uint16_t m_mtu_floor;
// the ceiling is the largest packet that we might
// be able to get through without fragmentation.
// i.e. ceiling +1 is very likely to not get through
// or we have in fact experienced a drop or ICMP
// message indicating that it is
boost::uint16_t m_mtu_ceiling;
// the sequence number of the probe in-flight
// this is 0 if there is no probe in flight
boost::uint16_t m_mtu_seq;
// this is a counter of how many times the current m_acked_seq_nr
// has been ACKed. If it's ACKed more than 3 times, we assume the
// packet with the next sequence number has been lost, and we trigger
// a re-send. Ovbiously an ACK only counts as a duplicate as long as
// we have outstanding packets following it.
boost::uint8_t m_duplicate_acks;
// the number of packet timeouts we've seen in a row
// this affects the packet timeout time
boost::uint8_t m_num_timeouts;
enum state_t {
// not yet connected
UTP_STATE_NONE,
// sent a syn packet, not received any acks
UTP_STATE_SYN_SENT,
// syn-ack received and in normal operation
// of sending and receiving data
UTP_STATE_CONNECTED,
// fin sent, but all packets up to the fin packet
// have not yet been acked. We might still be waiting
// for a FIN from the other end
UTP_STATE_FIN_SENT,
// ====== states beyond this point =====
// === are considered closing states ===
// === and will cause the socket to ====
// ============ be deleted =============
// the socket has been gracefully disconnected
// and is waiting for the client to make a
// socket call so that we can communicate this
// fact and actually delete all the state, or
// there is an error on this socket and we're
// waiting to communicate this to the client in
// a callback. The error in either case is stored
// in m_error. If the socket has gracefully shut
// down, the error is error::eof.
UTP_STATE_ERROR_WAIT,
// there are no more references to this socket
// and we can delete it
UTP_STATE_DELETE
};
// this is the cursor into m_delay_sample_hist
boost::uint8_t m_delay_sample_idx:2;
// the state the socket is in
boost::uint8_t m_state:3;
// this is set to true when we receive a fin
bool m_eof:1;
// is this socket state attached to a user space socket?
bool m_attached:1;
// this is true if nagle is enabled (which it is by default)
// TODO: 2 support the option to turn it off
bool m_nagle:1;
// this is true while the socket is in slow start mode. It's
// only in slow-start during the start-up phase. Slow start
// (contrary to what its name suggest) means that we're growing
// the congestion window (cwnd) exponetially rather than linearly.
// this is done at startup of a socket in order to find its
// link capacity faster. This behaves similar to TCP slow start
bool m_slow_start:1;
// this is true as long as we have as many packets in
// flight as allowed by the congestion window (cwnd)
bool m_cwnd_full:1;
// this is set to true when this socket has added itself to
// the utp socket manager's list of deferred acks. Once the
// burst of incoming UDP packets is all drained, the utp socket
// manager will send acks for all sockets on this list.
bool m_deferred_ack:1;
// this is true if this socket has subscribed to be notified
// when this receive round is done
bool m_subscribe_drained:1;
// if this socket tries to send a packet via the utp socket
// manager, and it fails with EWOULDBLOCK, the socket
// is stalled and this is set. It's also added to a list
// of sockets in the utp_socket_manager to be notified of
// the socket being writable again
bool m_stalled:1;
};
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
int socket_impl_size() { return sizeof(utp_socket_impl); }
#endif
utp_socket_impl* construct_utp_impl(boost::uint16_t recv_id
, boost::uint16_t send_id, void* userdata
, utp_socket_manager* sm)
{
return new utp_socket_impl(recv_id, send_id, userdata, sm);
}
void detach_utp_impl(utp_socket_impl* s)
{
s->detach();
}
void delete_utp_impl(utp_socket_impl* s)
{
delete s;
}
bool should_delete(utp_socket_impl* s)
{
return s->should_delete();
}
void tick_utp_impl(utp_socket_impl* s, ptime const& now)
{
s->tick(now);
}
void utp_init_mtu(utp_socket_impl* s, int link_mtu, int utp_mtu)
{
s->init_mtu(link_mtu, utp_mtu);
}
bool utp_incoming_packet(utp_socket_impl* s, char const* p
, int size, udp::endpoint const& ep, ptime receive_time)
{
return s->incoming_packet((boost::uint8_t const*)p, size, ep, receive_time);
}
bool utp_match(utp_socket_impl* s, udp::endpoint const& ep, boost::uint16_t id)
{
return s->m_remote_address == ep.address()
&& s->m_port == ep.port()
&& s->m_recv_id == id;
}
udp::endpoint utp_remote_endpoint(utp_socket_impl* s)
{
return udp::endpoint(s->m_remote_address, s->m_port);
}
boost::uint16_t utp_receive_id(utp_socket_impl* s)
{
return s->m_recv_id;
}
void utp_writable(utp_socket_impl* s)
{
TORRENT_ASSERT(s->m_stalled);
s->m_stalled = false;
s->writable();
}
void utp_send_ack(utp_socket_impl* s)
{
TORRENT_ASSERT(s->m_deferred_ack);
s->m_deferred_ack = false;
s->send_pkt(utp_socket_impl::pkt_ack);
}
void utp_socket_drained(utp_socket_impl* s)
{
s->m_subscribe_drained = false;
// at this point, we know we won't receive any
// more packets this round. So, we may want to
// call the receive callback function to
// let the user consume it
s->maybe_trigger_receive_callback();
s->maybe_trigger_send_callback();
}
void utp_socket_impl::update_mtu_limits()
{
INVARIANT_CHECK;
TORRENT_ASSERT(m_mtu_floor <= m_mtu_ceiling);
m_mtu = (m_mtu_floor + m_mtu_ceiling) / 2;
if ((m_cwnd >> 16) < m_mtu) m_cwnd = boost::int64_t(m_mtu) << 16;
UTP_LOGV("%8p: updating MTU to: %d [%d, %d]\n"
, this, m_mtu, m_mtu_floor, m_mtu_ceiling);
// clear the mtu probe sequence number since
// it was either dropped or acked
m_mtu_seq = 0;
}
int utp_socket_state(utp_socket_impl const* s)
{
return s->m_state;
}
int utp_stream::send_delay() const
{
return m_impl ? m_impl->m_send_delay : 0;
}
int utp_stream::recv_delay() const
{
return m_impl ? m_impl->m_recv_delay : 0;
}
utp_stream::utp_stream(asio::io_service& io_service)
: m_io_service(io_service)
, m_impl(0)
, m_open(false)
{
}
utp_socket_impl* utp_stream::get_impl()
{
return m_impl;
}
void utp_stream::close()
{
if (!m_impl) return;
if (!m_impl->destroy())
{
if (!m_impl) return;
detach_utp_impl(m_impl);
m_impl = 0;
}
}
std::size_t utp_stream::available() const
{
return m_impl->available();
}
utp_stream::endpoint_type utp_stream::remote_endpoint(error_code& ec) const
{
if (!m_impl)
{
ec = asio::error::not_connected;
return endpoint_type();
}
return m_impl->remote_endpoint(ec);
}
utp_stream::endpoint_type utp_stream::local_endpoint(error_code& ec) const
{
if (m_impl == 0 || m_impl->m_sm == 0)
{
ec = asio::error::not_connected;
return endpoint_type();
}
return tcp::endpoint(m_impl->m_local_address, m_impl->m_sm->local_port(ec));
}
utp_stream::~utp_stream()
{
if (m_impl)
{
UTP_LOGV("%8p: utp_stream destructed\n", m_impl);
m_impl->destroy();
detach_utp_impl(m_impl);
}
m_impl = 0;
}
void utp_stream::set_impl(utp_socket_impl* impl)
{
TORRENT_ASSERT(m_impl == 0);
TORRENT_ASSERT(!m_open);
m_impl = impl;
m_open = true;
}
int utp_stream::read_buffer_size() const
{
TORRENT_ASSERT(m_impl);
return m_impl->m_receive_buffer_size;
}
void utp_stream::on_read(void* self, size_t bytes_transferred, error_code const& ec, bool kill)
{
utp_stream* s = (utp_stream*)self;
UTP_LOGV("%8p: calling read handler read:%d ec:%s kill:%d\n", s->m_impl
, int(bytes_transferred), ec.message().c_str(), kill);
TORRENT_ASSERT(s->m_read_handler);
TORRENT_ASSERT(bytes_transferred > 0 || ec);
s->m_io_service.post(boost::bind<void>(s->m_read_handler, ec, bytes_transferred));
s->m_read_handler.clear();
if (kill && s->m_impl)
{
detach_utp_impl(s->m_impl);
s->m_impl = 0;
}
}
void utp_stream::on_write(void* self, size_t bytes_transferred, error_code const& ec, bool kill)
{
utp_stream* s = (utp_stream*)self;
UTP_LOGV("%8p: calling write handler written:%d ec:%s kill:%d\n", s->m_impl
, int(bytes_transferred), ec.message().c_str(), kill);
TORRENT_ASSERT(s->m_write_handler);
TORRENT_ASSERT(bytes_transferred > 0 || ec);
s->m_io_service.post(boost::bind<void>(s->m_write_handler, ec, bytes_transferred));
s->m_write_handler.clear();
if (kill && s->m_impl)
{
detach_utp_impl(s->m_impl);
s->m_impl = 0;
}
}
void utp_stream::on_connect(void* self, error_code const& ec, bool kill)
{
utp_stream* s = (utp_stream*)self;
TORRENT_ASSERT(s);
UTP_LOGV("%8p: calling connect handler ec:%s kill:%d\n"
, s->m_impl, ec.message().c_str(), kill);
TORRENT_ASSERT(s->m_connect_handler);
s->m_io_service.post(boost::bind<void>(s->m_connect_handler, ec));
s->m_connect_handler.clear();
if (kill && s->m_impl)
{
detach_utp_impl(s->m_impl);
s->m_impl = 0;
}
}
void utp_stream::add_read_buffer(void* buf, size_t len)
{
TORRENT_ASSERT(m_impl);
TORRENT_ASSERT(len < INT_MAX);
TORRENT_ASSERT(len > 0);
TORRENT_ASSERT(buf);
m_impl->m_read_buffer.push_back(utp_socket_impl::iovec_t(buf, len));
m_impl->m_read_buffer_size += len;
UTP_LOGV("%8p: add_read_buffer %d bytes\n", m_impl, int(len));
}
// this is the wrapper to add a user provided write buffer to the
// utp_socket_impl. It makes sure the m_write_buffer_size is kept
// up to date
void utp_stream::add_write_buffer(void const* buf, size_t len)
{
TORRENT_ASSERT(m_impl);
TORRENT_ASSERT(len < INT_MAX);
TORRENT_ASSERT(len > 0);
TORRENT_ASSERT(buf);
#ifdef TORRENT_DEBUG
int write_buffer_size = 0;
for (std::vector<utp_socket_impl::iovec_t>::iterator i = m_impl->m_write_buffer.begin()
, end(m_impl->m_write_buffer.end()); i != end; ++i)
{
write_buffer_size += i->len;
}
TORRENT_ASSERT(m_impl->m_write_buffer_size == write_buffer_size);
#endif
m_impl->m_write_buffer.push_back(utp_socket_impl::iovec_t((void*)buf, len));
m_impl->m_write_buffer_size += len;
#ifdef TORRENT_DEBUG
write_buffer_size = 0;
for (std::vector<utp_socket_impl::iovec_t>::iterator i = m_impl->m_write_buffer.begin()
, end(m_impl->m_write_buffer.end()); i != end; ++i)
{
write_buffer_size += i->len;
}
TORRENT_ASSERT(m_impl->m_write_buffer_size == write_buffer_size);
#endif
UTP_LOGV("%8p: add_write_buffer %d bytes\n", m_impl, int(len));
}
// this is called when all user provided read buffers have been added
// and it's time to execute the async operation. The first thing we
// do is to copy any data stored in m_receive_buffer into the user
// provided buffer. This might be enough to in turn trigger the read
// handler immediately.
void utp_stream::set_read_handler(handler_t h)
{
TORRENT_ASSERT(m_impl->m_userdata);
m_impl->m_read_handler = h;
if (m_impl->test_socket_state()) return;
UTP_LOGV("%8p: new read handler. %d bytes in buffer\n"
, m_impl, m_impl->m_receive_buffer_size);
TORRENT_ASSERT(m_impl->m_read_buffer_size > 0);
// so, the client wants to read. If we already
// have some data in the read buffer, move it into the
// client's buffer right away
m_impl->m_read += read_some(false);
m_impl->maybe_trigger_receive_callback();
}
size_t utp_stream::read_some(bool clear_buffers)
{
if (m_impl->m_receive_buffer_size == 0)
{
if (clear_buffers)
{
m_impl->m_read_buffer_size = 0;
m_impl->m_read_buffer.clear();
}
return 0;
}
std::vector<utp_socket_impl::iovec_t>::iterator target = m_impl->m_read_buffer.begin();
size_t ret = 0;
int pop_packets = 0;
for (std::vector<packet*>::iterator i = m_impl->m_receive_buffer.begin()
, end(m_impl->m_receive_buffer.end()); i != end;)
{
if (target == m_impl->m_read_buffer.end())
{
UTP_LOGV(" No more target buffers: %d bytes left in buffer\n"
, m_impl->m_receive_buffer_size);
TORRENT_ASSERT(m_impl->m_read_buffer.empty());
break;
}
m_impl->check_receive_buffers();
packet* p = *i;
int to_copy = (std::min)(p->size - p->header_size, int(target->len));
TORRENT_ASSERT(to_copy >= 0);
memcpy(target->buf, p->buf + p->header_size, to_copy);
ret += to_copy;
target->buf = ((char*)target->buf) + to_copy;
TORRENT_ASSERT(int(target->len) >= to_copy);
target->len -= to_copy;
m_impl->m_receive_buffer_size -= to_copy;
TORRENT_ASSERT(m_impl->m_read_buffer_size >= to_copy);
m_impl->m_read_buffer_size -= to_copy;
p->header_size += to_copy;
if (target->len == 0) target = m_impl->m_read_buffer.erase(target);
m_impl->check_receive_buffers();
TORRENT_ASSERT(m_impl->m_receive_buffer_size >= 0);
// Consumed entire packet
if (p->header_size == p->size)
{
free(p);
++pop_packets;
*i = 0;
++i;
}
if (m_impl->m_receive_buffer_size == 0)
{
UTP_LOGV(" Didn't fill entire target: %d bytes left in buffer\n"
, m_impl->m_receive_buffer_size);
break;
}
}
// remove the packets from the receive_buffer that we already copied over
// and freed
m_impl->m_receive_buffer.erase(m_impl->m_receive_buffer.begin()
, m_impl->m_receive_buffer.begin() + pop_packets);
// we exited either because we ran out of bytes to copy
// or because we ran out of space to copy the bytes to
TORRENT_ASSERT(m_impl->m_receive_buffer_size == 0
|| m_impl->m_read_buffer.empty());
UTP_LOGV("%8p: %d packets moved from buffer to user space (%d bytes)\n"
, m_impl, pop_packets, int(ret));
if (clear_buffers)
{
m_impl->m_read_buffer_size = 0;
m_impl->m_read_buffer.clear();
}
TORRENT_ASSERT(ret > 0);
return ret;
}
// this is called when all user provided write buffers have been
// added. Start trying to send packets with the payload immediately.
void utp_stream::set_write_handler(handler_t h)
{
UTP_LOGV("%8p: new write handler. %d bytes to write\n"
, m_impl, m_impl->m_write_buffer_size);
TORRENT_ASSERT(m_impl->m_write_buffer_size > 0);
TORRENT_ASSERT(m_impl->m_userdata);
m_impl->m_write_handler = h;
m_impl->m_written = 0;
if (m_impl->test_socket_state()) return;
// try to write. send_pkt returns false if there's
// no more payload to send or if the congestion window
// is full and we can't send more packets right now
while (m_impl->send_pkt());
// if there was an error in send_pkt(), m_impl may be
// 0 at this point
if (m_impl) m_impl->maybe_trigger_send_callback();
}
void utp_stream::do_connect(tcp::endpoint const& ep, utp_stream::connect_handler_t handler)
{
int link_mtu, utp_mtu;
m_impl->m_sm->mtu_for_dest(ep.address(), link_mtu, utp_mtu);
m_impl->init_mtu(link_mtu, utp_mtu);
TORRENT_ASSERT(m_impl->m_connect_handler == 0);
m_impl->m_remote_address = ep.address();
m_impl->m_port = ep.port();
m_impl->m_connect_handler = handler;
error_code ec;
m_impl->m_local_address = m_impl->m_sm->local_endpoint(m_impl->m_remote_address, ec).address();
if (m_impl->test_socket_state()) return;
m_impl->send_syn();
}
// =========== utp_socket_impl ============
utp_socket_impl::~utp_socket_impl()
{
INVARIANT_CHECK;
TORRENT_ASSERT(!m_attached);
TORRENT_ASSERT(!m_deferred_ack);
UTP_LOGV("%8p: destroying utp socket state\n", this);
// free any buffers we're holding
for (boost::uint16_t i = m_inbuf.cursor(), end((m_inbuf.cursor()
+ m_inbuf.capacity()) & ACK_MASK);
i != end; i = (i + 1) & ACK_MASK)
{
void* p = m_inbuf.remove(i);
free(p);
}
for (boost::uint16_t i = m_outbuf.cursor(), end((m_outbuf.cursor()
+ m_outbuf.capacity()) & ACK_MASK);
i != end; i = (i + 1) & ACK_MASK)
{
void* p = m_outbuf.remove(i);
free(p);
}
for (std::vector<packet*>::iterator i = m_receive_buffer.begin()
, end = m_receive_buffer.end(); i != end; ++i)
{
free(*i);
}
free(m_nagle_packet);
m_nagle_packet = NULL;
}
bool utp_socket_impl::should_delete() const
{
INVARIANT_CHECK;
// if the socket state is not attached anymore we're free
// to delete it from the client's point of view. The other
// endpoint however might still need to be told that we're
// closing the socket. Only delete the state if we're not
// attached and we're in a state where the other end doesn't
// expect the socket to still be alive
// when m_stalled is true, it means the socket manager has a
// pointer to this socket, waiting for the UDP socket to
// become writable again. We have to wait for that, so that
// the pointer is removed from that queue. Otherwise we would
// leave a dangling pointer in the socket manager
bool ret = (m_state >= UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_NONE)
&& !m_attached && !m_stalled;
if (ret)
{
UTP_LOGV("%8p: should_delete() = true\n", this);
}
return ret;
}
void utp_socket_impl::maybe_trigger_receive_callback()
{
INVARIANT_CHECK;
// nothing has been read or there's no outstanding read operation
if (m_read == 0 || m_read_handler == 0) return;
UTP_LOGV("%8p: calling read handler read:%d\n", this, m_read);
m_read_handler(m_userdata, m_read, m_error, false);
m_read_handler = 0;
m_read = 0;
m_read_buffer_size = 0;
m_read_buffer.clear();
}
void utp_socket_impl::maybe_trigger_send_callback()
{
INVARIANT_CHECK;
// nothing has been written or there's no outstanding write operation
if (m_written == 0 || m_write_handler == 0) return;
UTP_LOGV("%8p: calling write handler written:%d\n", this, m_written);
m_write_handler(m_userdata, m_written, m_error, false);
m_write_handler = 0;
m_written = 0;
m_write_buffer_size = 0;
m_write_buffer.clear();
}
bool utp_socket_impl::destroy()
{
INVARIANT_CHECK;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: destroy state:%s\n", this, socket_state_names[m_state]);
#endif
if (m_userdata == 0) return false;
if (m_state == UTP_STATE_CONNECTED)
send_fin();
bool cancelled = cancel_handlers(asio::error::operation_aborted, true);
m_userdata = 0;
m_read_buffer.clear();
m_read_buffer_size = 0;
m_write_buffer.clear();
m_write_buffer_size = 0;
if ((m_state == UTP_STATE_ERROR_WAIT
|| m_state == UTP_STATE_NONE
|| m_state == UTP_STATE_SYN_SENT) && cancelled)
{
m_state = UTP_STATE_DELETE;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]);
#endif
}
return cancelled;
// #error our end is closing. Wait for everything to be acked
}
void utp_socket_impl::detach()
{
INVARIANT_CHECK;
UTP_LOGV("%8p: detach()\n", this);
m_attached = false;
}
void utp_socket_impl::send_syn()
{
INVARIANT_CHECK;
m_seq_nr = random();
m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK;
m_loss_seq_nr = m_acked_seq_nr;
m_ack_nr = 0;
m_fast_resend_seq_nr = m_seq_nr;
packet* p = (packet*)malloc(sizeof(packet) + sizeof(utp_header));
p->size = sizeof(utp_header);
p->header_size = sizeof(utp_header);
p->num_transmissions = 0;
p->need_resend = false;
utp_header* h = (utp_header*)p->buf;
h->type_ver = (ST_SYN << 4) | 1;
h->extension = 0;
// using recv_id here is intentional! This is an odd
// thing in uTP. The syn packet is sent with the connection
// ID that it expects to receive the syn ack on. All
// subsequent connection IDs will be this plus one.
h->connection_id = m_recv_id;
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = 0;
h->seq_nr = m_seq_nr;
h->ack_nr = 0;
ptime now = time_now_hires();
p->send_time = now;
h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time()));
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: send_syn seq_nr:%d id:%d target:%s\n"
, this, int(m_seq_nr), int(m_recv_id)
, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str());
#endif
error_code ec;
m_sm->send_packet(udp::endpoint(m_remote_address, m_port), (char const*)h
, sizeof(utp_header), ec);
if (ec == error::would_block || ec == error::try_again)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: socket stalled\n", this);
#endif
if (!m_stalled)
{
m_stalled = true;
m_sm->subscribe_writable(this);
}
}
else if (ec)
{
free(p);
m_error = ec;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return;
}
if (!m_stalled)
++p->num_transmissions;
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
m_outbuf.insert(m_seq_nr, p);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
TORRENT_ASSERT(p->buf == (boost::uint8_t*)h);
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
TORRENT_ASSERT(!m_error);
m_state = UTP_STATE_SYN_SENT;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]);
#endif
}
// if a send ever failed with EWOULDBLOCK, we
// subscribe to the udp socket and will be
// signalled with this function.
void utp_socket_impl::writable()
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: writable\n", this);
#endif
if (should_delete()) return;
while(send_pkt());
maybe_trigger_send_callback();
}
void utp_socket_impl::send_fin()
{
INVARIANT_CHECK;
send_pkt(pkt_fin);
// unless there was an error, we're now
// in FIN-SENT state
if (!m_error)
m_state = UTP_STATE_FIN_SENT;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]);
#endif
}
void utp_socket_impl::send_reset(utp_header* ph)
{
INVARIANT_CHECK;
utp_header h;
h.type_ver = (ST_RESET << 4) | 1;
h.extension = 0;
h.connection_id = m_send_id;
h.timestamp_difference_microseconds = m_reply_micro;
h.wnd_size = 0;
h.seq_nr = random();
h.ack_nr = ph->seq_nr;
ptime now = time_now_hires();
h.timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time()));
UTP_LOGV("%8p: send_reset seq_nr:%d id:%d ack_nr:%d\n"
, this, int(h.seq_nr), int(m_send_id), int(ph->seq_nr));
// ignore errors here
error_code ec;
m_sm->send_packet(udp::endpoint(m_remote_address, m_port), (char const*)&h, sizeof(h), ec);
}
std::size_t utp_socket_impl::available() const
{
return m_receive_buffer_size;
}
void utp_socket_impl::parse_sack(boost::uint16_t packet_ack, boost::uint8_t const* ptr
, int size, int* acked_bytes, ptime const now, boost::uint32_t& min_rtt)
{
INVARIANT_CHECK;
if (size == 0) return;
// this is the sequence number the current bit represents
int ack_nr = (packet_ack + 2) & ACK_MASK;
#if TORRENT_VERBOSE_UTP_LOG
std::string bitmask;
bitmask.reserve(size);
for (boost::uint8_t const* b = ptr, *end = ptr + size; b != end; ++b)
{
unsigned char bitfield = unsigned(*b);
unsigned char mask = 1;
// for each bit
for (int i = 0; i < 8; ++i)
{
bitmask += (mask & bitfield) ? "1" : "0";
mask <<= 1;
}
}
UTP_LOGV("%8p: got SACK first:%d %s our_seq_nr:%u\n"
, this, ack_nr, bitmask.c_str(), m_seq_nr);
#endif
// the number of acked packets past the fast re-send sequence number
// this is used to determine if we should trigger more fast re-sends
int dups = 0;
// the sequence number of the last ACKed packet
int last_ack = packet_ack;
// for each byte
for (boost::uint8_t const* end = ptr + size; ptr != end; ++ptr)
{
unsigned char bitfield = unsigned(*ptr);
unsigned char mask = 1;
// for each bit
for (int i = 0; i < 8; ++i)
{
if (mask & bitfield)
{
last_ack = ack_nr;
if (m_fast_resend_seq_nr == ack_nr)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
if (compare_less_wrap(m_fast_resend_seq_nr, ack_nr, ACK_MASK)) ++dups;
// this bit was set, ack_nr was received
packet* p = (packet*)m_outbuf.remove(ack_nr);
if (p)
{
*acked_bytes += p->size - p->header_size;
// each ACKed packet counts as a duplicate ack
UTP_LOGV("%8p: duplicate_acks:%u fast_resend_seq_nr:%u\n"
, this, m_duplicate_acks, m_fast_resend_seq_nr);
ack_packet(p, now, min_rtt, ack_nr);
}
else
{
// this packet might have been acked by a previous
// selective ack
maybe_inc_acked_seq_nr();
}
}
mask <<= 1;
ack_nr = (ack_nr + 1) & ACK_MASK;
// we haven't sent packets past this point.
// if there are any more bits set, we have to
// ignore them anyway
if (ack_nr == m_seq_nr) break;
}
if (ack_nr == m_seq_nr) break;
}
TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1);
// we received more than dup_ack_limit ACKs in this SACK message.
// trigger fast re-send
if (dups >= dup_ack_limit && compare_less_wrap(m_fast_resend_seq_nr, last_ack, ACK_MASK))
{
experienced_loss(m_fast_resend_seq_nr);
int num_resent = 0;
for (; m_fast_resend_seq_nr != last_ack; m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK)
{
packet* p = (packet*)m_outbuf.at(m_fast_resend_seq_nr);
if (!p) continue;
++num_resent;
if (!resend_packet(p, true)) break;
m_duplicate_acks = 0;
if (num_resent >= sack_resend_limit) break;
}
}
}
// copies data from the write buffer into the packet
// pointed to by ptr
void utp_socket_impl::write_payload(boost::uint8_t* ptr, int size)
{
INVARIANT_CHECK;
#ifdef TORRENT_DEBUG
int write_buffer_size = 0;
for (std::vector<iovec_t>::iterator i = m_write_buffer.begin()
, end(m_write_buffer.end()); i != end; ++i)
{
write_buffer_size += i->len;
}
TORRENT_ASSERT(m_write_buffer_size == write_buffer_size);
#endif
TORRENT_ASSERT(!m_write_buffer.empty() || size == 0);
TORRENT_ASSERT(m_write_buffer_size >= size);
std::vector<iovec_t>::iterator i = m_write_buffer.begin();
if (size == 0) return;
int buffers_to_clear = 0;
while (size > 0)
{
// i points to the iovec we'll start copying from
int to_copy = (std::min)(size, int(i->len));
TORRENT_ASSERT(to_copy >= 0);
TORRENT_ASSERT(to_copy < INT_MAX / 2 && m_written < INT_MAX / 2);
memcpy(ptr, static_cast<char const*>(i->buf), to_copy);
size -= to_copy;
m_written += to_copy;
ptr += to_copy;
i->len -= to_copy;
TORRENT_ASSERT(m_write_buffer_size >= to_copy);
m_write_buffer_size -= to_copy;
((char const*&)i->buf) += to_copy;
if (i->len == 0) ++buffers_to_clear;
++i;
}
if (buffers_to_clear)
m_write_buffer.erase(m_write_buffer.begin()
, m_write_buffer.begin() + buffers_to_clear);
#ifdef TORRENT_DEBUG
write_buffer_size = 0;
for (std::vector<iovec_t>::iterator i = m_write_buffer.begin()
, end(m_write_buffer.end()); i != end; ++i)
{
write_buffer_size += i->len;
}
TORRENT_ASSERT(m_write_buffer_size == write_buffer_size);
#endif
}
void utp_socket_impl::subscribe_drained()
{
INVARIANT_CHECK;
if (m_subscribe_drained) return;
UTP_LOGV("%8p: socket drained\n", this);
m_subscribe_drained = true;
m_sm->subscribe_drained(this);
}
void utp_socket_impl::defer_ack()
{
INVARIANT_CHECK;
if (m_deferred_ack) return;
UTP_LOGV("%8p: defer ack\n", this);
m_deferred_ack = true;
m_sm->defer_ack(this);
}
void utp_socket_impl::remove_sack_header(packet* p)
{
INVARIANT_CHECK;
// remove the sack header
boost::uint8_t* ptr = p->buf + sizeof(utp_header);
utp_header* h = (utp_header*)p->buf;
TORRENT_ASSERT(h->extension == 1);
h->extension = ptr[0];
int sack_size = ptr[1];
TORRENT_ASSERT(h->extension == 0);
UTP_LOGV("%8p: removing SACK header, %d bytes\n"
, this, sack_size + 2);
TORRENT_ASSERT(p->size >= p->header_size);
TORRENT_ASSERT(p->header_size >= sizeof(utp_header) + sack_size + 2);
memmove(ptr, ptr + sack_size + 2, p->size - p->header_size);
p->header_size -= sack_size + 2;
p->size -= sack_size + 2;
}
// sends a packet, pulls data from the write buffer (if there's any)
// if ack is true, we need to send a packet regardless of if there's
// any data. Returns true if we could send more data (i.e. call
// send_pkt() again)
bool utp_socket_impl::send_pkt(int flags)
{
INVARIANT_CHECK;
bool force = (flags & pkt_ack) || (flags & pkt_fin);
// TORRENT_ASSERT(m_state != UTP_STATE_FIN_SENT || (flags & pkt_ack));
// first see if we need to resend any packets
for (int i = (m_acked_seq_nr + 1) & ACK_MASK; i != m_seq_nr; i = (i + 1) & ACK_MASK)
{
packet* p = (packet*)m_outbuf.at(i);
if (!p) continue;
if (!p->need_resend) continue;
if (!resend_packet(p))
{
// we couldn't resend the packet. It probably doesn't
// fit in our cwnd. If force is set, we need to continue
// to send our force anyway, if we don't have to send an
// force, we might as well return
if (!force) return false;
// resend_packet might have failed
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return false;
break;
}
// don't fast-resend this packet
if (m_fast_resend_seq_nr == i)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
}
int sack = 0;
if (m_inbuf.size())
{
// the SACK bitfield should ideally fit all
// the pieces we have successfully received
sack = (m_inbuf.span() + 7) / 8;
if (sack > 32) sack = 32;
}
int header_size = sizeof(utp_header) + (sack ? sack + 2 : 0);
int payload_size = m_write_buffer_size;
if (m_mtu - header_size < payload_size)
payload_size = m_mtu - header_size;
// if we have one MSS worth of data, make sure it fits in our
// congestion window and the advertized receive window from
// the other end.
if (m_bytes_in_flight + payload_size > (std::min)(int(m_cwnd >> 16)
, int(m_adv_wnd - m_bytes_in_flight)))
{
// this means there's not enough room in the send window for
// another packet. We have to hold off sending this data.
// we still need to send an ACK though
// if we're trying to send a FIN, make an exception
if ((flags & pkt_fin) == 0) payload_size = 0;
// we're constrained by the window size
m_last_cwnd_hit = time_now_hires();
m_cwnd_full = true;
UTP_LOGV("%8p: no space in window send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
if (!force)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send seq_nr:%d ack_nr:%d "
"id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, int(m_seq_nr), int(m_ack_nr)
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
#endif
return false;
}
}
// if we don't have any data to send, or can't send any data
// and we don't have any data to force, don't send a packet
if (payload_size == 0 && !force && !m_nagle_packet)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: skipping send (no payload and no force) seq_nr:%d ack_nr:%d "
"id:%d target:%s header_size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, int(m_seq_nr), int(m_ack_nr)
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, header_size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
#endif
return false;
}
int packet_size = header_size + payload_size;
packet* p = NULL;
boost::uint8_t* ptr = NULL;
utp_header* h = NULL;
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
bool stack_alloced = false;
#endif
// payload size being zero means we're just sending
// an force. We should not pick up the nagle packet
if (!m_nagle_packet || (payload_size == 0 && force))
{
// we only need a heap allocation if we have payload and
// need to keep the packet around (in the outbuf)
if (payload_size)
{
p = (packet*)malloc(sizeof(packet) + m_mtu);
p->allocated = m_mtu;
}
else
{
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
stack_alloced = true;
#endif
TORRENT_ASSERT(force);
// this alloca() statement won't necessarily produce
// correctly aligned memory. That's why we ask for 7 more bytes
// and adjust our pointer to be aligned later
p = (packet*)TORRENT_ALLOCA(char, sizeof(packet) + packet_size + 7);
p = (packet*)align_pointer(p);
UTP_LOGV("%8p: allocating %d bytes on the stack\n", this, packet_size);
p->allocated = packet_size;
}
p->size = packet_size;
p->header_size = packet_size - payload_size;
p->num_transmissions = 0;
p->need_resend = false;
ptr = p->buf;
h = (utp_header*)ptr;
ptr += sizeof(utp_header);
h->extension = sack ? 1 : 0;
h->connection_id = m_send_id;
// seq_nr is ignored for ST_STATE packets, so it doesn't
// matter that we say this is a sequence number we haven't
// actually sent yet
h->seq_nr = m_seq_nr;
h->type_ver = ((payload_size ? ST_DATA : ST_STATE) << 4) | 1;
write_payload(p->buf + p->header_size, payload_size);
}
else
{
// pick up the nagle packet and keep adding bytes to it
p = m_nagle_packet;
ptr = p->buf + sizeof(utp_header);
h = (utp_header*)p->buf;
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
// if the packet has a selective force header, we'll need
// to update it
if (h->extension == 1)
{
sack = ptr[1];
// if we no longer have any out-of-order packets waiting
// to be delivered, there's no selective ack to be sent.
if (m_inbuf.size() == 0)
{
// we need to remove the sack header
remove_sack_header(p);
sack = 0;
}
}
else
sack = 0;
int size_left = p->allocated - p->size;
TORRENT_ASSERT(size_left > 0);
size_left = (std::min)(size_left, m_write_buffer_size);
write_payload(p->buf + p->size, size_left);
p->size += size_left;
UTP_LOGV("%8p: NAGLE appending %d bytes to nagle packet. new size: %d allocated: %d\n"
, this, size_left, p->size, p->allocated);
// did we fill up the whole mtu?
// if we didn't, we may still send it if there's
// no bytes in flight
if (m_bytes_in_flight > 0
&& p->size < p->allocated
&& !force
&& m_nagle)
{
return false;
}
// clear the nagle packet pointer and fall through
// sending p
m_nagle_packet = NULL;
packet_size = p->size;
payload_size = p->size - p->header_size;
}
if (sack)
{
*ptr++ = 0; // end of extension chain
*ptr++ = sack; // bytes for SACK bitfield
write_sack(ptr, sack);
ptr += sack;
TORRENT_ASSERT(ptr <= p->buf + p->header_size);
}
if (m_bytes_in_flight > 0
&& p->size < p->allocated
&& !force
&& m_nagle)
{
// this is nagle. If we don't have a full packet
// worth of payload to send AND we have at least
// one outstanding packet, hold off. Once the
// outstanding packet is acked, we'll send this
// payload
UTP_LOGV("%8p: NAGLE not enough payload send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d\n"
, this, m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu);
TORRENT_ASSERT(m_nagle_packet == NULL);
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_nagle_packet = p;
return false;
}
// MTU DISCOVERY
if (m_mtu_seq == 0
&& p->size > m_mtu_floor
&& m_seq_nr != 0)
{
p->mtu_probe = true;
m_mtu_seq = m_seq_nr;
}
else
{
p->mtu_probe = false;
}
h->timestamp_difference_microseconds = m_reply_micro;
h->wnd_size = m_in_buf_size - m_buffered_incoming_bytes - m_receive_buffer_size;
h->ack_nr = m_ack_nr;
// if this is a FIN packet, override the type
if (flags & pkt_fin)
h->type_ver = (ST_FIN << 4) | 1;
// fill in the timestamp as late as possible
ptime now = time_now_hires();
p->send_time = now;
h->timestamp_microseconds = boost::uint32_t(total_microseconds(now - min_time()));
#if TORRENT_UTP_LOG
UTP_LOG("%8p: sending packet seq_nr:%d ack_nr:%d type:%s "
"id:%d target:%s size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d timestamp:%u time_diff:%u "
"mtu_probe:%d extension:%d\n"
, this, int(h->seq_nr), int(h->ack_nr), packet_type_names[h->get_type()]
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, p->size, m_error.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu, boost::uint32_t(h->timestamp_microseconds)
, boost::uint32_t(h->timestamp_difference_microseconds), int(p->mtu_probe)
, h->extension);
#endif
error_code ec;
#ifdef TORRENT_DEBUG
// simulate 1% packet loss
// if ((rand() % 100) > 0)
#endif
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)h, p->size, ec
, p->mtu_probe ? utp_socket_manager::dont_fragment : 0);
++m_out_packets;
if (ec == error::message_size)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: error sending packet: %s\n", this, ec.message().c_str());
#endif
// if we fail even though this is not a probe, we're screwed
// since we'd have to repacketize
TORRENT_ASSERT(p->mtu_probe);
m_mtu_ceiling = p->size - 1;
if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling;
update_mtu_limits();
// TODO: 2 we might want to do something else here
// as well, to resend the packet immediately without
// it being an MTU probe
p->mtu_probe = false;
if (m_mtu_seq == m_ack_nr)
m_mtu_seq = 0;
ec.clear();
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: re-sending\n", this);
#endif
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)h, p->size, ec, 0);
}
if (ec == error::would_block || ec == error::try_again)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: socket stalled\n", this);
#endif
if (!m_stalled)
{
m_stalled = true;
m_sm->subscribe_writable(this);
}
}
else if (ec)
{
TORRENT_ASSERT(stack_alloced != bool(payload_size));
if (payload_size) free(p);
m_error = ec;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return false;
}
if (!m_stalled)
++p->num_transmissions;
// if we have payload, we need to save the packet until it's acked
// and progress m_seq_nr
if (p->size > p->header_size)
{
// if we're sending a payload packet, there should not
// be a nagle packet waiting for more data
TORRENT_ASSERT(m_nagle_packet == NULL);
#if !TORRENT_UT_SEQ
// if the other end closed the connection immediately
// our FIN packet will end up having the same sequence
// number as the SYN, so this assert is invalid
TORRENT_ASSERT(!m_outbuf.at(m_seq_nr));
#endif
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
packet* old = (packet*)m_outbuf.insert(m_seq_nr, p);
if (old)
{
TORRENT_ASSERT(((utp_header*)old->buf)->seq_nr == m_seq_nr);
if (!old->need_resend) m_bytes_in_flight -= old->size - old->header_size;
free(old);
}
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
m_seq_nr = (m_seq_nr + 1) & ACK_MASK;
TORRENT_ASSERT(payload_size >= 0);
m_bytes_in_flight += p->size - p->header_size;
}
else
{
TORRENT_ASSERT(h->seq_nr == m_seq_nr);
}
// if the socket is stalled, always return false, don't
// try to write more packets. We'll keep writing once
// the underlying UDP socket becomes writable
return m_write_buffer_size > 0 && !m_cwnd_full && !m_stalled;
}
// size is in bytes
void utp_socket_impl::write_sack(boost::uint8_t* buf, int size) const
{
INVARIANT_CHECK;
TORRENT_ASSERT(m_inbuf.size());
int ack_nr = (m_ack_nr + 2) & ACK_MASK;
boost::uint8_t* end = buf + size;
for (; buf != end; ++buf)
{
*buf = 0;
int mask = 1;
for (int i = 0; i < 8; ++i)
{
if (m_inbuf.at(ack_nr)) *buf |= mask;
mask <<= 1;
ack_nr = (ack_nr + 1) & ACK_MASK;
}
}
}
bool utp_socket_impl::resend_packet(packet* p, bool fast_resend)
{
INVARIANT_CHECK;
// for fast re-sends the packet hasn't been marked as needing resending
TORRENT_ASSERT(p->need_resend || fast_resend);
if (m_error) return false;
if (((m_acked_seq_nr + 1) & ACK_MASK) == m_mtu_seq
&& m_mtu_seq != 0)
{
m_mtu_seq = 0;
p->mtu_probe = false;
// we got multiple acks for the packet before our probe, assume
// it was dropped because it was too big
m_mtu_ceiling = p->size - 1;
update_mtu_limits();
}
// we can only resend the packet if there's
// enough space in our congestion window
// since we can't re-packetize, some packets that are
// larger than the congestion window must be allowed through
// but only if we don't have any outstanding bytes
int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - m_bytes_in_flight;
if (!fast_resend
&& p->size - p->header_size > window_size_left
&& m_bytes_in_flight > 0)
{
m_last_cwnd_hit = time_now_hires();
m_cwnd_full = true;
return false;
}
// plus one since we have fast-resend as well, which doesn't
// necessarily trigger by a timeout
TORRENT_ASSERT(p->num_transmissions < m_sm->num_resends() + 1);
TORRENT_ASSERT(p->size - p->header_size >= 0);
if (p->need_resend) m_bytes_in_flight += p->size - p->header_size;
p->need_resend = false;
utp_header* h = (utp_header*)p->buf;
// update packet header
h->timestamp_difference_microseconds = m_reply_micro;
p->send_time = time_now_hires();
h->timestamp_microseconds = boost::uint32_t(total_microseconds(p->send_time - min_time()));
// if the packet has a selective ack header, we'll need
// to update it
if (h->extension == 1 && h->ack_nr != m_ack_nr)
{
boost::uint8_t* ptr = p->buf + sizeof(utp_header);
int sack_size = ptr[1];
if (m_inbuf.size())
{
// update the sack header
write_sack(ptr + 2, sack_size);
TORRENT_ASSERT(ptr + sack_size + 2 <= p->buf + p->header_size);
}
else
{
remove_sack_header(p);
}
}
h->ack_nr = m_ack_nr;
error_code ec;
m_sm->send_packet(udp::endpoint(m_remote_address, m_port)
, (char const*)p->buf, p->size, ec);
++m_out_packets;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: re-sending packet seq_nr:%d ack_nr:%d type:%s "
"id:%d target:%s size:%d error:%s send_buffer_size:%d cwnd:%d "
"adv_wnd:%d in-flight:%d mtu:%d timestamp:%u time_diff:%u\n"
, this, int(h->seq_nr), int(h->ack_nr), packet_type_names[h->get_type()]
, m_send_id, print_endpoint(udp::endpoint(m_remote_address, m_port)).c_str()
, p->size, ec.message().c_str(), m_write_buffer_size, int(m_cwnd >> 16)
, m_adv_wnd, m_bytes_in_flight, m_mtu, boost::uint32_t(h->timestamp_microseconds)
, boost::uint32_t(h->timestamp_difference_microseconds));
#endif
if (ec == error::would_block || ec == error::try_again)
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: socket stalled\n", this);
#endif
if (!m_stalled)
{
m_stalled = true;
m_sm->subscribe_writable(this);
}
}
else if (ec)
{
m_error = ec;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return false;
}
if (!m_stalled)
++p->num_transmissions;
return !m_stalled;
}
void utp_socket_impl::experienced_loss(int seq_nr)
{
INVARIANT_CHECK;
// since loss often comes in bursts, we only cut the
// window in half once per RTT. This is implemented
// by limiting which packets can cause us to cut the
// window size. The first packet that's lost will
// update the limit to the last sequence number we sent.
// i.e. only packet sent after this loss can cause another
// window size cut
if (compare_less_wrap(seq_nr, m_loss_seq_nr, ACK_MASK)) return;
// cut window size in 2
m_cwnd = (std::max)(m_cwnd * m_sm->loss_multiplier() / 100, boost::int64_t(m_mtu << 16));
m_loss_seq_nr = m_seq_nr;
UTP_LOGV("%8p: Lost packet %d caused cwnd cut\n", this, seq_nr);
// the window size could go below one MMS here, if it does,
// we'll get a timeout in about one second
// if we happen to be in slow-start mode, we need to leave it
m_slow_start = false;
}
void utp_socket_impl::maybe_inc_acked_seq_nr()
{
INVARIANT_CHECK;
bool incremented = false;
// don't pass m_seq_nr, since we move into sequence
// numbers that haven't been sent yet, and aren't
// supposed to be in m_outbuf
// if the slot in m_outbuf is 0, it means the
// packet has been ACKed and removed from the send buffer
while (((m_acked_seq_nr + 1) & ACK_MASK) != m_seq_nr
&& m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) == 0)
{
// increment the fast resend sequence number
if (m_fast_resend_seq_nr == m_acked_seq_nr)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
m_acked_seq_nr = (m_acked_seq_nr + 1) & ACK_MASK;
incremented = true;
}
if (!incremented) return;
// update loss seq number if it's less than the packet
// that was just acked. If loss seq nr is greater, it suggests
// that we're still in a window that has experienced loss
if (compare_less_wrap(m_loss_seq_nr, m_acked_seq_nr, ACK_MASK))
m_loss_seq_nr = m_acked_seq_nr;
m_duplicate_acks = 0;
}
void utp_socket_impl::ack_packet(packet* p, ptime const& receive_time
, boost::uint32_t& min_rtt, boost::uint16_t seq_nr)
{
INVARIANT_CHECK;
TORRENT_ASSERT(p);
// verify that the packet we're removing was in fact sent
// with the sequence number we expect
TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == seq_nr);
if (!p->need_resend)
{
TORRENT_ASSERT(m_bytes_in_flight >= p->size - p->header_size);
m_bytes_in_flight -= p->size - p->header_size;
}
if (seq_nr == m_mtu_seq && m_mtu_seq != 0)
{
TORRENT_ASSERT(p->mtu_probe);
// our mtu probe was acked!
m_mtu_floor = (std::max)(m_mtu_floor, p->size);
if (m_mtu_ceiling < m_mtu_floor) m_mtu_ceiling = m_mtu_floor;
update_mtu_limits();
}
// increment the acked sequence number counter
maybe_inc_acked_seq_nr();
boost::uint32_t rtt = boost::uint32_t(total_microseconds(receive_time - p->send_time));
if (receive_time < p->send_time)
{
// this means our clock is not monotonic. Just assume the RTT was 100 ms
rtt = 100000;
// the clock for this plaform is not monotonic!
TORRENT_ASSERT(false);
}
UTP_LOGV("%8p: acked packet %d (%d bytes) (rtt:%u)\n"
, this, seq_nr, p->size - p->header_size, rtt / 1000);
m_rtt.add_sample(rtt / 1000);
if (rtt < min_rtt) min_rtt = rtt;
free(p);
}
void utp_socket_impl::incoming(boost::uint8_t const* buf, int size, packet* p, ptime now)
{
INVARIANT_CHECK;
while (!m_read_buffer.empty())
{
if (p)
{
buf = p->buf + p->header_size;
TORRENT_ASSERT(p->size - p->header_size >= size);
}
iovec_t* target = &m_read_buffer.front();
int to_copy = (std::min)(size, int(target->len));
memcpy(target->buf, buf, to_copy);
m_read += to_copy;
target->buf = ((boost::uint8_t*)target->buf) + to_copy;
target->len -= to_copy;
buf += to_copy;
UTP_LOGV("%8p: copied %d bytes into user receive buffer\n", this, to_copy);
TORRENT_ASSERT(m_read_buffer_size >= to_copy);
m_read_buffer_size -= to_copy;
size -= to_copy;
if (target->len == 0) m_read_buffer.erase(m_read_buffer.begin());
if (p)
{
p->header_size += to_copy;
TORRENT_ASSERT(p->header_size <= p->size);
}
if (size == 0)
{
TORRENT_ASSERT(p == 0 || p->header_size == p->size);
free(p);
return;
}
}
TORRENT_ASSERT(m_read_buffer_size == 0);
if (!p)
{
TORRENT_ASSERT(buf);
p = (packet*)malloc(sizeof(packet) + size);
p->size = size;
p->header_size = 0;
memcpy(p->buf, buf, size);
}
// save this packet until the client issues another read
m_receive_buffer.push_back(p);
m_receive_buffer_size += p->size - p->header_size;
check_receive_buffers();
}
bool utp_socket_impl::cancel_handlers(error_code const& ec, bool kill)
{
INVARIANT_CHECK;
TORRENT_ASSERT(ec);
bool ret = m_read_handler || m_write_handler || m_connect_handler;
// calling the callbacks with m_userdata being 0 will just crash
TORRENT_ASSERT((ret && bool(m_userdata)) || !ret);
if (m_read_handler) m_read_handler(m_userdata, 0, ec, kill);
m_read_handler = 0;
if (m_write_handler) m_write_handler(m_userdata, 0, ec, kill);
m_write_handler = 0;
if (m_connect_handler) m_connect_handler(m_userdata, ec, kill);
m_connect_handler = 0;
return ret;
}
bool utp_socket_impl::consume_incoming_data(
utp_header const* ph, boost::uint8_t const* ptr, int payload_size
, ptime now)
{
INVARIANT_CHECK;
if (ph->get_type() != ST_DATA) return false;
if (m_eof && m_ack_nr == m_eof_seq_nr)
{
// What?! We've already received a FIN and everything up
// to it has been acked. Ignore this packet
return true;
}
if (ph->seq_nr == ((m_ack_nr + 1) & ACK_MASK))
{
TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == 0);
// we received a packet in order
incoming(ptr, payload_size, 0, now);
m_ack_nr = (m_ack_nr + 1) & ACK_MASK;
// If this packet was previously in the reorder buffer
// it would have been acked when m_ack_nr-1 was acked.
TORRENT_ASSERT(m_inbuf.at(m_ack_nr) == 0);
UTP_LOGV("%8p: remove inbuf: %d (%d)\n"
, this, m_ack_nr, int(m_inbuf.size()));
for (;;)
{
int const next_ack_nr = (m_ack_nr + 1) & ACK_MASK;
packet* p = (packet*)m_inbuf.remove(next_ack_nr);
if (!p)
break;
m_buffered_incoming_bytes -= p->size - p->header_size;
incoming(0, p->size - p->header_size, p, now);
m_ack_nr = next_ack_nr;
UTP_LOGV("%8p: reordered remove inbuf: %d (%d)\n"
, this, m_ack_nr, int(m_inbuf.size()));
}
}
else
{
// this packet was received out of order. Stick it in the
// reorder buffer until it can be delivered in order
// have we already received this packet and passed it on
// to the client?
if (!compare_less_wrap(m_ack_nr, ph->seq_nr, ACK_MASK))
{
UTP_LOGV("%8p: already received seq_nr: %d\n"
, this, int(ph->seq_nr));
return true;
}
// do we already have this packet? If so, just ignore it
if (m_inbuf.at(ph->seq_nr))
{
UTP_LOGV("%8p: already received seq_nr: %d\n"
, this, int(ph->seq_nr));
return true;
}
// we don't need to save the packet header, just the payload
packet* p = (packet*)malloc(sizeof(packet) + payload_size);
p->size = payload_size;
p->header_size = 0;
p->num_transmissions = 0;
p->need_resend = false;
memcpy(p->buf, ptr, payload_size);
m_inbuf.insert(ph->seq_nr, p);
m_buffered_incoming_bytes += p->size;
UTP_LOGV("%8p: out of order. insert inbuf: %d (%d) m_ack_nr: %d\n"
, this, int(ph->seq_nr), int(m_inbuf.size()), m_ack_nr);
}
return false;
}
// returns true of the socket was closed
bool utp_socket_impl::test_socket_state()
{
INVARIANT_CHECK;
// if the socket is in a state where it's dead, just waiting to
// tell the client that it's closed. Do that and transition into
// the deleted state, where it will be deleted
// it might be possible to get here twice, in which we need to
// cancel any new handlers as well, even though we're already
// in the delete state
if (!m_error) return false;
TORRENT_ASSERT(m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE);
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s error:%s\n"
, this, socket_state_names[m_state], m_error.message().c_str());
#endif
if (cancel_handlers(m_error, true))
{
m_state = UTP_STATE_DELETE;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]);
#endif
return true;
}
return false;
}
void utp_socket_impl::init_mtu(int link_mtu, int utp_mtu)
{
INVARIANT_CHECK;
// if we're in a RAM constrained environment, don't increase
// the buffer size for interfaces with large MTUs. Just stick
// to ethernet frame sizes
if (m_sm->allow_dynamic_sock_buf())
{
// Make sure that we have enough socket buffer space
// for sending and receiving packets of this size
// add 10% for smaller ACKs and other overhead
m_sm->set_sock_buf(link_mtu * 11 / 10);
}
else if (link_mtu > TORRENT_ETHERNET_MTU)
{
// we can't use larger packets than this since we're
// not allocating any more memory for socket buffers
int decrease = link_mtu - TORRENT_ETHERNET_MTU;
utp_mtu -= decrease;
link_mtu -= decrease;
}
// set the ceiling to what we found out from the interface
m_mtu_ceiling = utp_mtu;
// however, start the search from a more conservative MTU
int overhead = link_mtu - utp_mtu;
m_mtu = TORRENT_ETHERNET_MTU - overhead;
if (m_mtu > m_mtu_ceiling) m_mtu = m_mtu_ceiling;
if (m_mtu_floor > utp_mtu) m_mtu_floor = utp_mtu;
// if the window size is smaller than one packet size
// set it to one
if ((m_cwnd >> 16) < m_mtu) m_cwnd = boost::int64_t(m_mtu) << 16;
UTP_LOGV("%8p: initializing MTU to: %d [%d, %d]\n"
, this, m_mtu, m_mtu_floor, m_mtu_ceiling);
}
// return false if this is an invalid packet
bool utp_socket_impl::incoming_packet(boost::uint8_t const* buf, int size
, udp::endpoint const& ep, ptime receive_time)
{
INVARIANT_CHECK;
utp_header* ph = (utp_header*)buf;
if (ph->get_version() != 1)
{
UTP_LOGV("%8p: incoming packet version:%d (ignored)\n"
, this, int(ph->get_version()));
return false;
}
// SYN packets have special (reverse) connection ids
if (ph->get_type() != ST_SYN && ph->connection_id != m_recv_id)
{
UTP_LOGV("%8p: incoming packet id:%d expected:%d (ignored)\n"
, this, int(ph->connection_id), int(m_recv_id));
return false;
}
if (ph->get_type() >= NUM_TYPES)
{
UTP_LOGV("%8p: incoming packet type:%d (ignored)\n"
, this, int(ph->get_type()));
return false;
}
if (m_state == UTP_STATE_NONE && ph->get_type() == ST_SYN)
{
m_remote_address = ep.address();
m_port = ep.port();
}
if (m_state != UTP_STATE_NONE && ph->get_type() == ST_SYN)
{
UTP_LOGV("%8p: incoming packet type:ST_SYN (ignored)\n", this);
return true;
}
bool step = false;
if (receive_time - m_last_history_step > minutes(1))
{
step = true;
m_last_history_step = receive_time;
}
// this is the difference between their send time and our receive time
// 0 means no sample yet
boost::uint32_t their_delay = 0;
if (ph->timestamp_microseconds != 0)
{
m_reply_micro = boost::uint32_t(total_microseconds(receive_time - min_time()))
- ph->timestamp_microseconds;
boost::uint32_t prev_base = m_their_delay_hist.initialized() ? m_their_delay_hist.base() : 0;
their_delay = m_their_delay_hist.add_sample(m_reply_micro, step);
int base_change = m_their_delay_hist.base() - prev_base;
UTP_LOGV("%8p: their_delay::add_sample:%u prev_base:%u new_base:%u\n"
, this, m_reply_micro, prev_base, m_their_delay_hist.base());
if (prev_base && base_change < 0 && base_change > -10000 && m_delay_hist.initialized())
{
// their base delay went down. This is caused by clock drift. To compensate,
// adjust our base delay upwards
// don't adjust more than 10 ms. If the change is that big, something is probably wrong
m_delay_hist.adjust_base(-base_change);
}
UTP_LOGV("%8p: incoming packet reply_micro:%u base_change:%d\n"
, this, m_reply_micro, prev_base ? base_change : 0);
}
if (ph->get_type() == ST_RESET)
{
UTP_LOGV("%8p: incoming packet type:RESET\n", this);
m_error = asio::error::connection_reset;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return true;
}
// is this ACK valid? If the other end is ACKing
// a packet that hasn't been sent yet
// just ignore it. A 3rd party could easily inject a packet
// like this in a stream, don't sever it because of it.
// since m_seq_nr is the sequence number of the next packet
// we'll send (and m_seq_nr-1 was the last packet we sent),
// if the ACK we got is greater than the last packet we sent
// something is wrong.
// If our state is state_none, this packet must be a syn packet
// and the ack_nr should be ignored
boost::uint16_t cmp_seq_nr = (m_seq_nr - 1) & ACK_MASK;
#if TORRENT_UT_SEQ
if (m_state == UTP_STATE_SYN_SENT && ph->get_type() == ST_STATE)
cmp_seq_nr = m_seq_nr;
#endif
if (m_state != UTP_STATE_NONE
&& compare_less_wrap(cmp_seq_nr, ph->ack_nr, ACK_MASK))
{
UTP_LOGV("%8p: incoming packet ack_nr:%d our seq_nr:%d (ignored)\n"
, this, int(ph->ack_nr), m_seq_nr);
return true;
}
// check to make sure the sequence number of this packet
// is reasonable. If it's a data packet and we've already
// received it, ignore it. This is either a stray old packet
// that finally made it here (after having been re-sent) or
// an attempt to interfere with the connection from a 3rd party
// in both cases, we can safely ignore the timestamp and ACK
// information in this packet
/*
// even if we've already received this packet, we need to
// send another ack to it, since it may be a resend caused by
// our ack getting dropped
if (m_state != UTP_STATE_SYN_SENT
&& ph->get_type() == ST_DATA
&& !compare_less_wrap(m_ack_nr, ph->seq_nr, ACK_MASK))
{
// we've already received this packet
UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n"
, this, int(ph->seq_nr), m_ack_nr);
return true;
}
*/
// if the socket is closing, always ignore any packet
// with a higher sequence number than the FIN sequence number
if (m_eof && compare_less_wrap(m_eof_seq_nr, ph->seq_nr, ACK_MASK))
{
UTP_LOGV("%8p: incoming packet seq_nr:%d eof_seq_nr:%d (ignored)\n"
, this, int(ph->seq_nr), m_eof_seq_nr);
}
if (m_state != UTP_STATE_NONE
&& m_state != UTP_STATE_SYN_SENT
&& compare_less_wrap((m_ack_nr + max_packets_reorder) & ACK_MASK, ph->seq_nr, ACK_MASK))
{
// this is too far out to fit in our reorder buffer. Drop it
// This is either an attack to try to break the connection
// or a seariously damaged connection that lost a lot of
// packets. Neither is very likely, and it should be OK
// to drop the timestamp information.
UTP_LOGV("%8p: incoming packet seq_nr:%d our ack_nr:%d (ignored)\n"
, this, int(ph->seq_nr), m_ack_nr);
return true;
}
++m_in_packets;
// this is a valid incoming packet, update the timeout timer
m_num_timeouts = 0;
m_timeout = receive_time + milliseconds(packet_timeout());
UTP_LOGV("%8p: updating timeout to: now + %d\n"
, this, packet_timeout());
// the test for INT_MAX here is a work-around for a bug in uTorrent where
// it's sometimes sent as INT_MAX when it is in fact uninitialized
const boost::uint32_t sample = ph->timestamp_difference_microseconds == INT_MAX
? 0 : ph->timestamp_difference_microseconds;
boost::uint32_t delay = 0;
if (sample != 0)
{
delay = m_delay_hist.add_sample(sample, step);
m_delay_sample_hist[m_delay_sample_idx++] = delay;
if (m_delay_sample_idx >= num_delay_hist) m_delay_sample_idx = 0;
}
int acked_bytes = 0;
TORRENT_ASSERT(m_bytes_in_flight >= 0);
int prev_bytes_in_flight = m_bytes_in_flight;
m_adv_wnd = ph->wnd_size;
// if we get an ack for the same sequence number as
// was last ACKed, and we have outstanding packets,
// it counts as a duplicate ack
if (ph->ack_nr == m_acked_seq_nr && m_outbuf.size())
{
++m_duplicate_acks;
}
boost::uint32_t min_rtt = UINT_MAX;
TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1);
// has this packet already been ACKed?
// if the ACK we just got is less than the max ACKed
// sequence number, it doesn't tell us anything.
// So, only act on it if the ACK is greater than the last acked
// sequence number
if (m_state != UTP_STATE_NONE && compare_less_wrap(m_acked_seq_nr, ph->ack_nr, ACK_MASK))
{
int const next_ack_nr = ph->ack_nr;
for (int ack_nr = (m_acked_seq_nr + 1) & ACK_MASK;
ack_nr != ((next_ack_nr + 1) & ACK_MASK);
ack_nr = (ack_nr + 1) & ACK_MASK)
{
if (m_fast_resend_seq_nr == ack_nr)
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
packet* p = (packet*)m_outbuf.remove(ack_nr);
if (!p) continue;
acked_bytes += p->size - p->header_size;
ack_packet(p, receive_time, min_rtt, ack_nr);
}
maybe_inc_acked_seq_nr();
}
// look for extended headers
boost::uint8_t const* ptr = buf;
ptr += sizeof(utp_header);
unsigned int extension = ph->extension;
while (extension)
{
// invalid packet. It says it has an extension header
// but the packet is too short
if (ptr - buf + 2 > size)
{
UTP_LOGV("%8p: invalid extension header\n", this);
return true;
}
int next_extension = *ptr++;
int len = *ptr++;
if (len < 0)
{
UTP_LOGV("%8p: invalid extension length:%d packet:%d\n"
, this, len, int(ptr - buf));
return true;
}
if (ptr - buf + len > size_t(size))
{
UTP_LOGV("%8p: invalid extension header size:%d packet:%d\n"
, this, len, int(ptr - buf));
return true;
}
switch(extension)
{
case 1: // selective ACKs
parse_sack(ph->ack_nr, ptr, len, &acked_bytes, receive_time, min_rtt);
break;
}
ptr += len;
extension = next_extension;
}
// the send operation in parse_sack() may have set the socket to an error
// state, in which case we shouldn't continue
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
if (m_duplicate_acks >= dup_ack_limit
&& ((m_acked_seq_nr + 1) & ACK_MASK) == m_fast_resend_seq_nr)
{
// LOSS
UTP_LOGV("%8p: Packet %d lost.\n", this, m_fast_resend_seq_nr);
// resend the lost packet
packet* p = (packet*)m_outbuf.at(m_fast_resend_seq_nr);
TORRENT_ASSERT(p);
if (p)
{
experienced_loss(m_fast_resend_seq_nr);
resend_packet(p, true);
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
}
// don't fast-resend this again
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
}
// ptr points to the payload of the packet
// size is the packet size, payload is the
// number of payload bytes are in this packet
const int header_size = ptr - buf;
const int payload_size = size - header_size;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: incoming packet seq_nr:%d ack_nr:%d type:%s id:%d size:%d timestampdiff:%u timestamp:%u "
"our ack_nr:%d our seq_nr:%d our acked_seq_nr:%d our state:%s\n"
, this, int(ph->seq_nr), int(ph->ack_nr), packet_type_names[ph->get_type()]
, int(ph->connection_id), payload_size, boost::uint32_t(ph->timestamp_difference_microseconds)
, boost::uint32_t(ph->timestamp_microseconds), m_ack_nr, m_seq_nr, m_acked_seq_nr, socket_state_names[m_state]);
#endif
if (ph->get_type() == ST_FIN)
{
// We ignore duplicate FIN packets, but we still need to ACK them.
if (ph->seq_nr == ((m_ack_nr + 1) & ACK_MASK)
|| ph->seq_nr == m_ack_nr)
{
UTP_LOGV("%8p: FIN received in order\n", this);
// The FIN arrived in order, nothing else is in the
// reorder buffer.
// TORRENT_ASSERT(m_inbuf.size() == 0);
m_ack_nr = ph->seq_nr;
// Transition to UTP_STATE_FIN_SENT. The sent FIN is also an ack
// to the FIN we received. Once we're in UTP_STATE_FIN_SENT we
// just need to wait for our FIN to be acked.
if (m_state == UTP_STATE_FIN_SENT)
{
send_pkt(pkt_ack);
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
}
else
{
send_fin();
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
}
}
if (m_eof)
{
UTP_LOGV("%8p: duplicate FIN packet (ignoring)\n", this);
return true;
}
m_eof = true;
m_eof_seq_nr = ph->seq_nr;
// we will respond with a fin once we have received everything up to m_eof_seq_nr
}
switch (m_state)
{
case UTP_STATE_NONE:
{
if (ph->get_type() == ST_SYN)
{
// if we're in state_none, the only thing
// we accept are SYN packets.
m_state = UTP_STATE_CONNECTED;
m_remote_address = ep.address();
m_port = ep.port();
error_code ec;
m_local_address = m_sm->local_endpoint(m_remote_address, ec).address();
m_ack_nr = ph->seq_nr;
m_seq_nr = random();
m_acked_seq_nr = (m_seq_nr - 1) & ACK_MASK;
m_loss_seq_nr = m_acked_seq_nr;
m_fast_resend_seq_nr = m_seq_nr;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: received ST_SYN state:%s seq_nr:%d ack_nr:%d\n"
, this, socket_state_names[m_state], m_seq_nr, m_ack_nr);
#endif
TORRENT_ASSERT(m_send_id == ph->connection_id);
TORRENT_ASSERT(m_recv_id == ((m_send_id + 1) & 0xffff));
defer_ack();
return true;
}
else
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: type:%s state:%s (ignored)\n"
, this, packet_type_names[ph->get_type()], socket_state_names[m_state]);
#endif
return true;
}
break;
}
case UTP_STATE_SYN_SENT:
{
// just wait for an ack to our SYN, ignore everything else
if (ph->ack_nr != ((m_seq_nr - 1) & ACK_MASK))
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: incorrect ack_nr (%d) waiting for %d\n"
, this, int(ph->ack_nr), (m_seq_nr - 1) & ACK_MASK);
#endif
return true;
}
TORRENT_ASSERT(!m_error);
m_state = UTP_STATE_CONNECTED;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: state:%s\n", this, socket_state_names[m_state]);
#endif
// only progress our ack_nr on ST_DATA messages
// since our m_ack_nr is uninitialized at this point
// we still need to set it to something regardless
if (ph->get_type() == ST_DATA)
m_ack_nr = ph->seq_nr;
else
m_ack_nr = (ph->seq_nr - 1) & ACK_MASK;
// notify the client that the socket connected
if (m_connect_handler)
{
UTP_LOGV("%8p: calling connect handler\n", this);
m_connect_handler(m_userdata, m_error, false);
}
m_connect_handler = 0;
// fall through
}
case UTP_STATE_CONNECTED:
{
// the lowest seen RTT can be used to clamp the delay
// within reasonable bounds. The one-way delay is never
// higher than the round-trip time.
if (sample && acked_bytes && prev_bytes_in_flight)
{
// it's impossible for delay to be more than the RTT, so make
// sure to clamp it as a sanity check
if (delay > min_rtt) delay = min_rtt;
// only use the minimum from the last 3 delay measurements
delay = *std::min_element(m_delay_sample_hist, m_delay_sample_hist + num_delay_hist);
do_ledbat(acked_bytes, delay, prev_bytes_in_flight, receive_time);
m_send_delay = delay;
}
m_recv_delay = (std::min)(their_delay, min_rtt);
consume_incoming_data(ph, ptr, payload_size, receive_time);
// the parameter to send_pkt tells it if we're acking data
// If we are, we'll send an ACK regardless of if we have any
// space left in our send window or not. If we just got an ACK
// (i.e. ST_STATE) we're not ACKing anything. If we just
// received a FIN packet, we need to ack that as well
bool has_ack = ph->get_type() == ST_DATA || ph->get_type() == ST_FIN || ph->get_type() == ST_SYN;
int prev_out_packets = m_out_packets;
// try to send more data as long as we can
// if send_pkt returns true
while (send_pkt());
if (has_ack && prev_out_packets == m_out_packets)
{
// we need to ack some data we received, and we didn't
// end up sending any payload packets in the loop
// above (becasue m_out_packets would have been incremented
// in that case). This means we need to send an ack.
// don't do it right away, because we may still receive
// more packets. defer the ack to send as few acks as possible
defer_ack();
}
// we may want to call the user callback function at the end
// of this round. Subscribe to that event
subscribe_drained();
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
// Everything up to the FIN has been receieved, respond with a FIN
// from our side.
if (m_eof && m_ack_nr == ((m_eof_seq_nr - 1) & ACK_MASK))
{
UTP_LOGV("%8p: incoming stream consumed\n", this);
// This transitions to the UTP_STATE_FIN_SENT state.
send_fin();
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return true;
}
#if TORRENT_UTP_LOG
if (sample && acked_bytes && prev_bytes_in_flight)
{
char their_delay_base[20];
if (m_their_delay_hist.initialized())
snprintf(their_delay_base, sizeof(their_delay_base), "%u", m_their_delay_hist.base());
else
strcpy(their_delay_base, "-");
char our_delay_base[20];
if (m_delay_hist.initialized())
snprintf(our_delay_base, sizeof(our_delay_base), "%u", m_delay_hist.base());
else
strcpy(our_delay_base, "-");
UTP_LOG("%8p: "
"actual_delay:%u "
"our_delay:%f "
"their_delay:%f "
"off_target:%f "
"max_window:%u "
"upload_rate:%d "
"delay_base:%s "
"delay_sum:%f "
"target_delay:%d "
"acked_bytes:%d "
"cur_window:%d "
"scaled_gain:%f "
"rtt:%u "
"rate:%d "
"quota:%d "
"wnduser:%u "
"rto:%d "
"timeout:%d "
"get_microseconds:%u "
"cur_window_packets:%u "
"packet_size:%d "
"their_delay_base:%s "
"their_actual_delay:%u "
"seq_nr:%u "
"acked_seq_nr:%u "
"reply_micro:%u "
"min_rtt:%u "
"send_buffer:%d "
"recv_buffer:%d "
"fast_resend_seq_nr:%d "
"\n"
, this
, sample
, float(delay / 1000.f)
, float(their_delay / 1000.f)
, float(int(m_sm->target_delay() - delay)) / 1000.f
, boost::uint32_t(m_cwnd >> 16)
, 0
, our_delay_base
, float(delay + their_delay) / 1000.f
, m_sm->target_delay() / 1000
, acked_bytes
, m_bytes_in_flight
, 0.f // float(scaled_gain)
, m_rtt.mean()
, int((m_cwnd * 1000 / (m_rtt.mean()?m_rtt.mean():50)) >> 16)
, 0
, m_adv_wnd
, packet_timeout()
, int(total_milliseconds(m_timeout - receive_time))
, int(total_microseconds(receive_time - min_time()))
, (m_seq_nr - m_acked_seq_nr) & ACK_MASK
, m_mtu
, their_delay_base
, boost::uint32_t(m_reply_micro)
, m_seq_nr
, m_acked_seq_nr
, m_reply_micro
, min_rtt / 1000
, m_write_buffer_size
, m_read_buffer_size
, m_fast_resend_seq_nr);
}
#endif
return true;
}
case UTP_STATE_FIN_SENT:
{
// There are two ways we can end up in this state:
//
// 1. If the socket has been explicitly closed on our
// side, in which case m_eof is false.
//
// 2. If we received a FIN from the remote side, in which
// case m_eof is true. If this is the case, we don't
// come here until everything up to the FIN has been
// received.
//
//
//
// At this point m_seq_nr - 1 is the FIN sequence number.
// We can receive both ST_DATA and ST_STATE here, because after
// we have closed our end of the socket, the remote end might
// have data in the pipeline. We don't really care about the
// data, but we do have to ack it. Or rather, we have to ack
// the FIN that will come after the data.
// Case 1:
// ---------------------------------------------------------------
//
// If we are here because the local endpoint was closed, we need
// to first wait for all of our messages to be acked:
//
// if (m_acked_seq_nr == ((m_seq_nr - 1) & ACK_MASK))
//
// `m_seq_nr - 1` is the ST_FIN message that we sent.
//
// ----------------------
//
// After that has happened we need to wait for the remote side
// to send its ST_FIN message. When we receive that we send an
// ST_STATE back to ack, and wait for a sufficient period.
// During this wait we keep acking incoming ST_FIN's. This is
// all handled at the top of this function.
//
// Note that the user handlers are all cancelled when the initial
// close() call happens, so nothing will happen on the user side
// after that.
// Case 2:
// ---------------------------------------------------------------
//
// If we are here because we received a ST_FIN message, and then
// sent our own ST_FIN to ack that, we need to wait for our ST_FIN
// to be acked:
//
// if (m_acked_seq_nr == ((m_seq_nr - 1) & ACK_MASK))
//
// `m_seq_nr - 1` is the ST_FIN message that we sent.
//
// After that has happened we know the remote side has all our
// data, and we can gracefully shut down.
if (consume_incoming_data(ph, ptr, payload_size, receive_time))
return true;
if (m_acked_seq_nr == ((m_seq_nr - 1) & ACK_MASK))
{
// When this happens we know that the remote side has
// received all of our packets.
UTP_LOGV("%8p: FIN acked\n", this);
if (!m_attached)
{
UTP_LOGV("%8p: close initiated here, delete socket\n", this);
m_error = asio::error::eof;
m_state = UTP_STATE_DELETE;
test_socket_state();
}
else
{
UTP_LOGV("%8p: closing socket\n", this);
m_error = asio::error::eof;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
}
}
return true;
}
case UTP_STATE_DELETE:
default:
{
// respond with a reset
send_reset(ph);
return true;
}
}
return false;
}
void utp_socket_impl::do_ledbat(int acked_bytes, int delay, int in_flight, ptime const now)
{
INVARIANT_CHECK;
// the portion of the in-flight bytes that were acked. This is used to make
// the gain factor be scaled by the rtt. The formula is applied once per
// rtt, or on every ACK skaled by the number of ACKs per rtt
TORRENT_ASSERT(in_flight > 0);
TORRENT_ASSERT(acked_bytes > 0);
int target_delay = m_sm->target_delay();
// all of these are fixed points with 16 bits fraction portion
boost::int64_t window_factor = (boost::int64_t(acked_bytes) << 16) / in_flight;
boost::int64_t delay_factor = (boost::int64_t(target_delay - delay) << 16) / target_delay;
boost::int64_t scaled_gain;
if (delay >= target_delay)
{
UTP_LOGV("%8p: off_target: %d slow_start -> 0\n", this, target_delay - delay);
m_slow_start = false;
}
boost::int64_t linear_gain = (window_factor * delay_factor) >> 16;
linear_gain *= boost::int64_t(m_sm->gain_factor());
if (m_slow_start)
{
// mimic TCP slow-start by adding the number of acked
// bytes to cwnd
scaled_gain = (std::max)(boost::int64_t(acked_bytes) << 16, linear_gain);
}
else
{
scaled_gain = linear_gain;
}
// make sure we don't wrap the cwnd
if (scaled_gain >= std::numeric_limits<boost::int64_t>::max() - m_cwnd)
scaled_gain = std::numeric_limits<boost::int64_t>::max() - m_cwnd - 1;
if (scaled_gain > 0 && !m_cwnd_full
&& m_last_cwnd_hit + milliseconds(50) < now)
{
UTP_LOGV("%8p: last_cwnd_hit:%d full_cwnd:%d scaled_gain -> 0, slow_start -> 0\n", this
, total_milliseconds(now - m_last_cwnd_hit), int(m_cwnd_full));
// we haven't bumped into the cwnd limit size in the last second
// this probably means we have a send rate limit, so we shouldn't make
// the cwnd size any larger
scaled_gain = 0;
m_slow_start = false;
}
UTP_LOGV("%8p: do_ledbat delay:%d off_target: %d window_factor:%f target_factor:%f "
"scaled_gain:%f cwnd:%d slow_start:%d\n"
, this, delay, target_delay - delay, window_factor / float(1 << 16)
, delay_factor / float(1 << 16)
, scaled_gain / float(1 << 16), int(m_cwnd >> 16)
, int(m_slow_start));
// if scaled_gain + m_cwnd <= 0, set m_cwnd to 0
if (-scaled_gain >= m_cwnd)
{
m_cwnd = 0;
}
else
{
m_cwnd += scaled_gain;
TORRENT_ASSERT(m_cwnd > 0);
}
TORRENT_ASSERT(m_cwnd >= 0);
int window_size_left = (std::min)(int(m_cwnd >> 16), int(m_adv_wnd)) - in_flight + acked_bytes;
if (window_size_left >= m_mtu)
{
UTP_LOGV("%8p: mtu:%d in_flight:%d adv_wnd:%d cwnd:%d acked_bytes:%d cwnd_full -> 0\n"
, this, m_mtu, in_flight, int(m_adv_wnd), int(m_cwnd >> 16), acked_bytes);
if (m_cwnd_full) m_last_cwnd_hit = time_now_hires();
m_cwnd_full = false;
}
if ((m_cwnd >> 16) >= m_adv_wnd)
m_slow_start = false;
}
void utp_stream::bind(endpoint_type const& ep, error_code& ec) { }
// returns the number of milliseconds a packet would have before
// it would time-out if it was sent right now. Takes the RTT estimate
// into account
int utp_socket_impl::packet_timeout() const
{
INVARIANT_CHECK;
// SYN packets have a bit longer timeout, since we don't
// have an RTT estimate yet, make a conservative guess
if (m_state == UTP_STATE_NONE) return 3000;
// avoid overflow by simply capping based on number of timeouts as well
if (m_num_timeouts >= 7) return 60000;
int timeout = (std::max)(m_sm->min_timeout(), m_rtt.mean() + m_rtt.avg_deviation() * 2);
if (m_num_timeouts > 0) timeout += (1 << (int(m_num_timeouts) - 1)) * 1000;
return timeout;
}
void utp_socket_impl::tick(ptime const& now)
{
INVARIANT_CHECK;
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: tick:%s r: %d (%s) w: %d (%s)\n"
, this, socket_state_names[m_state], m_read, m_read_handler ? "handler" : "no handler"
, m_written, m_write_handler ? "handler" : "no handler");
#endif
TORRENT_ASSERT(m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK) || ((m_seq_nr - m_acked_seq_nr) & ACK_MASK) <= 1);
// if we're already in an error state, we're just waiting for the
// client to perform an operation so that we can communicate the
// error. No need to do anything else with this socket
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return;
if (now > m_timeout)
{
// TIMEOUT!
// set cwnd to 1 MSS
if (m_outbuf.size()) ++m_num_timeouts;
if (m_num_timeouts > m_sm->num_resends())
{
// the connection is dead
m_error = asio::error::timed_out;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return;
}
if (((m_acked_seq_nr + 1) & ACK_MASK) == m_mtu_seq
&& ((m_seq_nr - 1) & ACK_MASK) == m_mtu_seq
&& m_mtu_seq != 0)
{
// we timed out, and the only outstanding packet
// we had was the probe. Assume it was dropped
// because it was too big
m_mtu_ceiling = m_mtu - 1;
if (m_mtu_floor > m_mtu_ceiling) m_mtu_floor = m_mtu_ceiling;
update_mtu_limits();
}
if (m_bytes_in_flight == 0 && (m_cwnd >> 16) >= m_mtu)
{
// this is just a timeout because this direction of
// the stream is idle. Don't reset the cwnd, just decay it
m_cwnd = (std::max)(m_cwnd * 2 / 3, boost::int64_t(m_mtu) << 16);
}
else
{
// we timed out because a packet was not ACKed or because
// the cwnd was made smaller than one packet
m_cwnd = boost::int64_t(m_mtu) << 16;
}
TORRENT_ASSERT(m_cwnd >= 0);
m_timeout = now + milliseconds(packet_timeout());
UTP_LOGV("%8p: timeout resetting cwnd:%d\n"
, this, int(m_cwnd >> 16));
// we dropped all packets, that includes the mtu probe
m_mtu_seq = 0;
// since we've already timed out now, don't count
// loss that we might detect for packets that just
// timed out
m_loss_seq_nr = m_seq_nr;
// we need to go one past m_seq_nr to cover the case
// where we just sent a SYN packet and then adjusted for
// the uTorrent sequence number reuse
for (int i = m_acked_seq_nr & ACK_MASK;
i != ((m_seq_nr + 1) & ACK_MASK);
i = (i + 1) & ACK_MASK)
{
packet* p = (packet*)m_outbuf.at(i);
if (!p) continue;
if (p->need_resend) continue;
p->need_resend = true;
TORRENT_ASSERT(m_bytes_in_flight >= p->size - p->header_size);
m_bytes_in_flight -= p->size - p->header_size;
UTP_LOGV("%8p: Packet %d lost.\n", this, i);
}
TORRENT_ASSERT(m_bytes_in_flight == 0);
// if we have a packet that needs re-sending, resend it
packet* p = (packet*)m_outbuf.at((m_acked_seq_nr + 1) & ACK_MASK);
if (p)
{
if (p->num_transmissions >= m_sm->num_resends()
|| (m_state == UTP_STATE_SYN_SENT && p->num_transmissions >= m_sm->syn_resends())
|| (m_state == UTP_STATE_FIN_SENT && p->num_transmissions >= m_sm->fin_resends()))
{
#if TORRENT_UTP_LOG
UTP_LOGV("%8p: %d failed sends in a row. Socket timed out. state:%s\n"
, this, p->num_transmissions, socket_state_names[m_state]);
#endif
// the connection is dead
m_error = asio::error::timed_out;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return;
}
// don't fast-resend this packet
if (m_fast_resend_seq_nr == ((m_acked_seq_nr + 1) & ACK_MASK))
m_fast_resend_seq_nr = (m_fast_resend_seq_nr + 1) & ACK_MASK;
// the packet timed out, resend it
resend_packet(p);
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return;
}
else if (m_state < UTP_STATE_FIN_SENT)
{
send_pkt();
if (m_state == UTP_STATE_ERROR_WAIT || m_state == UTP_STATE_DELETE) return;
}
else if (m_state == UTP_STATE_FIN_SENT)
{
// the connection is dead
m_error = asio::error::eof;
m_state = UTP_STATE_ERROR_WAIT;
test_socket_state();
return;
}
}
switch (m_state)
{
case UTP_STATE_NONE:
case UTP_STATE_DELETE:
return;
// case UTP_STATE_SYN_SENT:
//
// break;
}
}
void utp_socket_impl::check_receive_buffers() const
{
INVARIANT_CHECK;
std::size_t size = 0;
for (std::vector<packet*>::const_iterator i = m_receive_buffer.begin()
, end(m_receive_buffer.end()); i != end; ++i)
{
if (packet const* p = *i)
size += p->size - p->header_size;
}
TORRENT_ASSERT(int(size) == m_receive_buffer_size);
}
#if defined TORRENT_DEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
void utp_socket_impl::check_invariant() const
{
for (int i = m_outbuf.cursor();
i != ((m_outbuf.cursor() + m_outbuf.span()) & ACK_MASK);
i = (i + 1) & ACK_MASK)
{
packet* p = (packet*)m_outbuf.at(i);
if (m_mtu_seq == i && m_mtu_seq != 0 && p)
{
TORRENT_ASSERT(p->mtu_probe);
}
if (!p) continue;
TORRENT_ASSERT(((utp_header*)p->buf)->seq_nr == i);
}
if (m_nagle_packet)
{
// if this packet is full, it should have been sent
TORRENT_ASSERT(m_nagle_packet->size < m_nagle_packet->allocated);
}
}
#endif
}