Work-in-progress on TCP:

* Changed the implementation to be more BSD like; state variables are now
  the same set as usual.
* The BufferQueue didn't use the initial sequence correctly (problems with
  SYN sequence).
* It now also removes data out of the current data set (ie. data that was
  already read by the application).
* BufferQueue::Get() also didn't work correctly (the version used by sending
  data).
* Fixed various issues around the code like incorrect handling of unexpected
  data.
* TCP options don't need the end-of-options marker in case they fill up the
  data already, also, the end-of-options marker doesn't need to be padded.
* Options are now only processed during SYN - other options may come
  later (timestamps are candidate number one).
* Also broke what was working before: connections do no longer work!


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@19376 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2006-11-26 23:29:08 +00:00
parent fc8c0ec946
commit 1166a4f9b5
6 changed files with 221 additions and 70 deletions

View File

@ -67,12 +67,13 @@ BufferQueue::Add(net_buffer *buffer)
void
BufferQueue::Add(net_buffer *buffer, tcp_sequence sequence)
{
TRACE(("BufferQueue@%p::Add(buffer %p, size %lu, sequence %lu)\n",
this, buffer, buffer->size, (uint32)sequence));
buffer->sequence = sequence;
if (fList.IsEmpty() || fFirstSequence > sequence)
fFirstSequence = sequence;
if (fList.IsEmpty() || sequence >= fLastSequence) {
// we usually just add the buffer to the
// we usually just add the buffer to the end of the queue
fList.Add(buffer);
if (sequence == fLastSequence && fLastSequence - fFirstSequence == fNumBytes) {
@ -88,6 +89,12 @@ BufferQueue::Add(net_buffer *buffer, tcp_sequence sequence)
if (fLastSequence < sequence + buffer->size)
fLastSequence = sequence + buffer->size;
if (fFirstSequence > sequence) {
// this buffer contains data that is already long gone - trim it
gBufferModule->remove_header(buffer, fFirstSequence - sequence);
sequence = fFirstSequence;
}
// find for the place where to insert the buffer into the queue
SegmentList::ReverseIterator iterator = fList.GetReverseIterator();
@ -161,6 +168,8 @@ BufferQueue::Add(net_buffer *buffer, tcp_sequence sequence)
status_t
BufferQueue::RemoveUntil(tcp_sequence sequence)
{
TRACE(("BufferQueue@%p::RemoveUntil(sequence %lu)\n", this, (uint32)sequence));
SegmentList::Iterator iterator = fList.GetIterator();
net_buffer *buffer = NULL;
while ((buffer = iterator.Next()) != NULL) {
@ -196,6 +205,8 @@ BufferQueue::RemoveUntil(tcp_sequence sequence)
status_t
BufferQueue::Get(net_buffer *buffer, tcp_sequence sequence, size_t bytes)
{
TRACE(("BufferQueue@%p::Get(sequence %lu, bytes %lu)\n", this, (uint32)sequence, bytes));
if (bytes == 0)
return B_OK;
@ -213,7 +224,7 @@ BufferQueue::Get(net_buffer *buffer, tcp_sequence sequence, size_t bytes)
SegmentList::Iterator iterator = fList.GetIterator();
net_buffer *source = NULL;
while ((source = iterator.Next()) != NULL) {
if (tcp_sequence(sequence + bytes) <= source->sequence)
if (sequence < source->sequence + source->size)
break;
}
@ -225,7 +236,7 @@ BufferQueue::Get(net_buffer *buffer, tcp_sequence sequence, size_t bytes)
uint32 offset = source->sequence - sequence;
while (source != NULL && bytesLeft > 0) {
size_t size = min_c(buffer->size - offset, bytesLeft);
size_t size = min_c(source->size - offset, bytesLeft);
status_t status = gBufferModule->append_cloned(buffer, source, offset, size);
if (status < B_OK)
return status;

View File

@ -36,7 +36,11 @@ class BufferQueue {
size_t Used() const { return fNumBytes; }
size_t Free() const { return fMaxBytes - fNumBytes; }
bool IsContiguous() const { return fNumBytes == fContiguousBytes; }
tcp_sequence FirstSequence() const { return fFirstSequence; }
tcp_sequence LastSequence() const { return fLastSequence; }
tcp_sequence NextSequence() const { return fFirstSequence + fContiguousBytes; }
private:
SegmentList fList;

View File

@ -30,6 +30,12 @@
#include <NetUtilities.h>
// Things this implementation currently doesn't implement:
// TCP, RFC 793
// SYN-Cache
// TCP Extensions for High Performance, RFC 1323
// SACK, Selective Acknowledgment - RFC 2018, RFC 2883
#define TRACE_TCP
#ifdef TRACE_TCP
# define TRACE(x) dprintf x
@ -88,8 +94,8 @@ TCPConnection::TCPConnection(net_socket *socket)
:
fSendWindowShift(0),
fReceiveWindowShift(0),
fLastAcknowledged(0), //system_time()),
fSendNext(fLastAcknowledged),
fSendUnacknowledged(0), //system_time()),
fSendNext(fSendUnacknowledged),
fSendWindow(0),
fSendQueue(socket->send.buffer_size),
fRoute(NULL),
@ -136,30 +142,23 @@ TCPConnection::InitCheck() const
inline bool
TCPConnection::_IsAcknowledgeValid(uint32 acknowledge)
TCPConnection::_IsAcknowledgeValid(tcp_sequence acknowledge) const
{
dprintf("last byte ackd %lu, next byte to send %lu, ack %lu\n", fLastAcknowledged, fSendNext, acknowledge);
return fLastAcknowledged > fSendNext
? acknowledge >= fLastAcknowledged || acknowledge <= fSendNext
: acknowledge >= fLastAcknowledged && acknowledge <= fSendNext;
return acknowledge > fSendUnacknowledged
&& acknowledge <= fSendMax;
}
/*!
If this method returns \c true, the sequence contains valid data with
regard to the receive window.
*/
inline bool
TCPConnection::_IsSequenceValid(uint32 sequence, uint32 length)
TCPConnection::_IsSequenceValid(tcp_sequence sequence, uint32 length) const
{
uint32 end = sequence + length;
if (fReceiveNext < fReceiveNext + TCP_MAX_RECV_BUF) {
return sequence >= fReceiveNext
&& end <= fReceiveNext + TCP_MAX_RECV_BUF;
}
// integer overflow
return (sequence >= fReceiveNext
|| sequence <= fReceiveNext + TCP_MAX_RECV_BUF)
&& (end >= fReceiveNext
|| end <= fReceiveNext + TCP_MAX_RECV_BUF);
tcp_sequence end = tcp_sequence(sequence + length - 1);
return (sequence >= fReceiveNext && sequence < fReceiveWindow + fReceiveWindow)
|| (end >= fReceiveNext && end <= fReceiveNext + fReceiveWindow);
}
@ -294,6 +293,8 @@ dprintf("************************* size = %ld, shift = %d\n", socket->receive.bu
return status;
}
fSendQueue.SetInitialSequence(fSendNext);
// wait until 3-way handshake is complete (if needed)
bigtime_t timeout = min_c(socket->send.timeout, TCP_CONNECTION_TIMEOUT);
@ -537,6 +538,8 @@ TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
TCP_FLAG_SYNCHRONIZE | TCP_FLAG_ACKNOWLEDGE, false);
//benaphore_unlock(&connection->fSendLock);
fSendQueue.SetInitialSequence(fSendNext);
if (status < B_OK)
return DROP;
@ -547,25 +550,29 @@ TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
int32
TCPConnection::SynchronizeSentReceive(tcp_segment_header& segment, net_buffer *buffer)
{
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& segment.acknowledge != fSendNext)
return DROP | RESET;
if (segment.flags & TCP_FLAG_RESET) {
fError = ECONNREFUSED;
fState = CLOSED;
return DROP;
}
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& !_IsAcknowledgeValid(segment.acknowledge))
return DROP | RESET;
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) == 0)
return DROP;
fReceiveNext = segment.sequence + 1;
fLastAcknowledged = segment.acknowledge;
segment.sequence++;
fSendUnacknowledged = segment.acknowledge + 1;
fReceiveNext = segment.sequence;
if (segment.flags & TCP_FLAG_ACKNOWLEDGE) {
// the connection has been established
fState = ESTABLISHED;
fReceiveQueue.SetInitialSequence(fReceiveNext);
release_sem_etc(fSendLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: this is not enough - we need to use B_RELEASE_ALL
} else {
@ -585,23 +592,136 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
// TODO: rethink locking!
// first check that the received sequence number is good
if (_IsSequenceValid(segment.sequence, buffer->size)) {
// If a valid RST was received, terminate the connection.
if (segment.flags & TCP_FLAG_RESET) {
fError = ECONNREFUSED;
fState = CLOSED;
return DROP;
uint32 advertisedWindow = (uint32)segment.advertised_window << fSendWindowShift;
// First, handle the most common case for uni-directional data transfer
// (known as header prediction - the segment must not change the window,
// and must be the expected sequence, and contain no control flags)
if (fState == ESTABLISHED
&& segment.AcknowledgeOnly()
&& advertisedWindow > 0 && advertisedWindow == fSendWindow
&& fSendNext == fSendMax) {
if (buffer->size == 0) {
// this is a pure acknowledge segment - we're on the sending end
if (fSendUnacknowledged < segment.acknowledge
&& fSendMax >= segment.acknowledge) {
// and it only acknowledges outstanding data
// TODO: update RTT estimators
fSendQueue.RemoveUntil(segment.acknowledge);
fSendUnacknowledged = segment.acknowledge;
// notify threads waiting on the socket to become writable again
release_sem_etc(fSendLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_WRITE, fSendWindow);
// if there is data left to be send, send it now
return _SendQueuedData(TCP_FLAG_ACKNOWLEDGE, false);
}
} else if (segment.acknowledge == fSendUnacknowledged
&& fReceiveQueue.IsContiguous()
&& fReceiveQueue.Free() >= buffer->size) {
// we're on the receiving end of the connection, and this segment
// is the one we were expecting, in-sequence
fReceiveQueue.Add(buffer, segment.sequence);
fReceiveNext += buffer->size;
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
return KEEP | ACKNOWLEDGE;
}
}
// The fast path was not applicable, so we continue with the standard processing
// of the incoming segment
// First have a look at the data being sent
if (segment.flags & TCP_FLAG_RESET) {
// is this a valid reset?
if (fLastAcknowledgeSent <= segment.sequence
&& tcp_sequence(segment.sequence) < fLastAcknowledgeSent + fReceiveWindow) {
// TODO: close connection depending on state
fError = ECONNREFUSED;
fState = CLOSED;
remove_connection(this);
}
return DROP;
}
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) != 0
|| (fState == SYNCHRONIZE_RECEIVED && !_IsAcknowledgeValid(segment.acknowledge))) {
// this is not allowed here, so we can reset the connection
return DROP | RESET;
}
fReceiveWindow = max_c(fReceiveQueue.Free(), fReceiveWindow);
// the window must not shrink
// trim buffer to be within the receive window
int32 drop = fReceiveNext - segment.sequence;;
if (drop > 0) {
if ((uint32)drop > buffer->size
|| ((uint32)drop == buffer->size && (segment.flags & TCP_FLAG_FINISH) == 0)) {
// don't accidently remove a FIN we shouldn't remove
segment.flags &= ~TCP_FLAG_FINISH;
drop = buffer->size;
}
// remove duplicate data at the start
gBufferModule->remove_header(buffer, drop);
segment.sequence += drop;
}
int32 action = KEEP;
drop = segment.sequence + buffer->size - (fReceiveNext + fReceiveWindow);
if (drop > 0) {
// remove data exceeding our window
if ((uint32)drop >= buffer->size) {
// if we can accept data, or the segment is not what we'd expect,
// drop the segment (an immediate acknowledge is always triggered)
if (fReceiveWindow != 0 || segment.sequence != fReceiveNext)
return DROP | IMMEDIATE_ACKNOWLEDGE;
action |= IMMEDIATE_ACKNOWLEDGE;
}
if ((segment.flags & TCP_FLAG_FINISH) != 0) {
// we need to remove the finish, too, as part of the data
drop--;
}
segment.flags &= ~(TCP_FLAG_FINISH | TCP_FLAG_PUSH);
gBufferModule->remove_trailer(buffer, drop);
}
// Then look at the acknowledgement for any updates
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0) {
// process acknowledged data
if (!_IsAcknowledgeValid(segment.acknowledge)) {
if (fSendUnacknowledged >= segment.acknowledge) {
}
}
}
if ((segment.flags & TCP_FLAG_RESET) != 0)
return DROP;
status_t status;
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& _IsAcknowledgeValid(segment.acknowledge)) {
fLastAcknowledged = segment.acknowledge;
if (fLastAcknowledged == fSendNext) {
fSendUnacknowledged = segment.acknowledge;
if (fSendUnacknowledged == fSendNext) {
// there is no data waiting to be processed
if (fState == WAIT_FOR_FINISH_ACKNOWLEDGE) {
fState = CLOSED;
status = remove_connection(this);
@ -614,11 +734,13 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
return DROP;
} else if (fState == FINISH_SENT)
fState = FINISH_ACKNOWLEDGED;
else if (fState == SYNCHRONIZE_RECEIVED)
else if (fState == SYNCHRONIZE_RECEIVED) {
fState = ESTABLISHED;
fReceiveQueue.SetInitialSequence(fReceiveNext);
}
}
fSendWindow = segment.advertised_window;
fSendWindow = advertisedWindow;
if (segment.flags & TCP_FLAG_FINISH) {
dprintf("peer is finishing connection!");
@ -634,7 +756,7 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
fState = TIME_WAIT;
break;
case FINISH_RECEIVED:
if (fLastAcknowledged == fSendNext) {
if (fSendUnacknowledged == fSendNext) {
fState = TIME_WAIT;
status = remove_connection(this);
if (status != B_OK)
@ -660,16 +782,14 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
// state machine is done switching states and the data is good.
// put it in the receive buffer
// TODO: This isn't the most efficient way to do it, and will need to be changed
// to deal with Silly Window Syndrome
if (buffer->size > 0)
fReceiveQueue.Add(buffer, segment.sequence);
else
gBufferModule->free(buffer);
if (fState != CLOSING && fState != WAIT_FOR_FINISH_ACKNOWLEDGE)
action &= ACKNOWLEDGE;
fReceiveNext = fReceiveQueue.NextSequence();
fReceiveWindow = fReceiveQueue.Free();
return action;
}
@ -708,6 +828,7 @@ TCPConnection::_SendQueuedData(uint16 flags, bool empty)
uint32 effectiveWindow = min_c(next->module->get_mtu(next,
(sockaddr *)&socket->address), fSendWindow);
uint32 available = fSendQueue.Available(fSendNext);
dprintf("fSendWindow = %lu, available = %lu\n", fSendWindow, available);
if (effectiveWindow > available)
effectiveWindow = available;
@ -721,6 +842,14 @@ TCPConnection::_SendQueuedData(uint16 flags, bool empty)
if (buffer == NULL)
return B_NO_MEMORY;
status_t status = B_OK;
if (effectiveWindow > 0)
fSendQueue.Get(buffer, fSendNext, effectiveWindow);
if (status < B_OK) {
gBufferModule->free(buffer);
return status;
}
gAddressModule->set_to((sockaddr *)&buffer->source, (sockaddr *)&socket->address);
gAddressModule->set_to((sockaddr *)&buffer->destination, (sockaddr *)&socket->peer);
TRACE(("TCP:%p.SendQueuedData() buffer %p, from address %s to %s\n", this,
@ -744,7 +873,7 @@ TCPConnection::_SendQueuedData(uint16 flags, bool empty)
//segment.window_shift = fReceiveWindowShift;
}
status_t status = add_tcp_header(segment, buffer);
status = add_tcp_header(segment, buffer);
if (status != B_OK) {
gBufferModule->free(buffer);
return status;

View File

@ -57,9 +57,8 @@ class TCPConnection : public net_protocol {
static int32 HashOffset() { return offsetof(TCPConnection, fHashNext); }
private:
bool _IsAcknowledgeValid(uint32 acknowledge);
bool _IsSequenceValid(uint32 sequence, uint32 length);
bool _IsAcknowledgeValid(tcp_sequence acknowledge) const;
bool _IsSequenceValid(tcp_sequence sequence, uint32 length) const;
status_t _SendQueuedData(uint16 flags, bool empty);
static void _TimeWait(struct net_timer *timer, void *data);
@ -73,16 +72,18 @@ class TCPConnection : public net_protocol {
uint8 fSendWindowShift;
uint8 fReceiveWindowShift;
uint32 fLastAcknowledged;
uint32 fSendNext;
tcp_sequence fSendUnacknowledged;
tcp_sequence fSendNext;
tcp_sequence fSendMax;
uint32 fSendWindow;
uint32 fMaxSegmentSize;
BufferQueue fSendQueue;
tcp_sequence fLastAcknowledgeSent;
net_route *fRoute;
// TODO: don't use a net_route, but a net_route_info!!!
uint32 fReceiveNext;
tcp_sequence fReceiveNext;
uint32 fReceiveWindow;
uint32 fMaxReceiveSize;
BufferQueue fReceiveQueue;

View File

@ -127,20 +127,14 @@ add_options(tcp_segment_header &segment, uint8 *buffer, size_t bufferSize)
bump_option(option, length);
}
if (length == 0) {
// no option defined
return 0;
}
while ((length + 1) & 0x3) {
// bump to a multiple of 4 length
option->kind = TCP_OPTION_NOP;
option = (tcp_option *)((uint8 *)option + 1);
length++;
if ((length & 3) == 0) {
// options completely fill out the option space
return length;
}
option->kind = TCP_OPTION_END;
return length + 1;
return (length + 3) & ~3;
// bump to a multiple of 4 length
}
@ -198,9 +192,6 @@ add_tcp_header(tcp_segment_header &segment, net_buffer *buffer)
void
process_options(tcp_segment_header &segment, net_buffer *buffer, int32 size)
{
segment.window_shift = 0;
segment.max_segment_size = 0;
if (size == 0)
return;
@ -559,7 +550,11 @@ tcp_receive_data(net_buffer *buffer)
segment.advertised_window = header.AdvertisedWindow();
segment.urgent_offset = header.UrgentOffset();
segment.flags = header.flags;
process_options(segment, buffer, headerLength - sizeof(tcp_header));
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) != 0) {
// for now, we only process the options in the SYN segment
// TODO: when we support timestamps, they could be handled specifically
process_options(segment, buffer, headerLength - sizeof(tcp_header));
}
bufferHeader.Remove(headerLength);
// we no longer need to keep the header around

View File

@ -71,15 +71,20 @@ struct tcp_header {
class tcp_sequence {
public:
tcp_sequence() {}
tcp_sequence(uint32 sequence) : number(sequence) {}
operator uint32() const { return number; }
void operator=(uint32 sequence) { number = sequence; }
bool operator>(uint32 sequence) const { return (int32)(number - sequence) > 0; }
bool operator>=(uint32 sequence) const { return (int32)(number - sequence) >= 0; }
bool operator<(uint32 sequence) const { return (int32)(number - sequence) < 0; }
bool operator<=(uint32 sequence) const { return (int32)(number - sequence) <= 0; }
uint32 operator+=(uint32 sequence) { return number += sequence; }
uint32& operator+=(uint32 sequence) { return number += sequence; }
uint32& operator++() { return ++number; }
uint32 operator++(int _) { return number++; }
private:
uint32 number;
@ -129,6 +134,12 @@ struct tcp_segment_header {
uint8 flags;
uint8 window_shift;
uint16 max_segment_size;
bool AcknowledgeOnly() const
{
return (flags & (TCP_FLAG_SYNCHRONIZE | TCP_FLAG_FINISH | TCP_FLAG_RESET
| TCP_FLAG_URGENT | TCP_FLAG_ACKNOWLEDGE)) == TCP_FLAG_ACKNOWLEDGE;
}
};
enum tcp_segment_action {