Some more work-in-progress:

* the TCPConnection::Receive() method is now more or less working as it should;
  of course, there are a number of missing things (like round-trip time estimation,
  retransmit timers, receive window update, ...).
* reply_with_reset() was broken, and accidently always send the segment it should
  answer with reset... (causing an endless loop during connect)
* BufferQueue::RemoveUntil() must always set the fFirstSequence member to the new
  sequence, or you will never be able to send anything with that queue (as the
  data in it would be no longer contiguous).
* connects, sendings (only single segments), and receiving data is working now
  basically (but very incomplete); retransmits or even subsequent transmits (if
  the data to be sent doesn't fit in one segment) doesn't work yet, so you better
  don't lose any segments :-)


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@19378 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2006-11-28 02:20:35 +00:00
parent a9a07b7215
commit 8535905415
5 changed files with 134 additions and 117 deletions

View File

@ -170,11 +170,14 @@ BufferQueue::RemoveUntil(tcp_sequence sequence)
{
TRACE(("BufferQueue@%p::RemoveUntil(sequence %lu)\n", this, (uint32)sequence));
fFirstSequence = sequence;
SegmentList::Iterator iterator = fList.GetIterator();
net_buffer *buffer = NULL;
while ((buffer = iterator.Next()) != NULL) {
if (sequence <= buffer->sequence) {
fFirstSequence = buffer->sequence;
// just in case there is a hole, how unlikely this may ever be
break;
}

View File

@ -32,9 +32,13 @@
// Things this implementation currently doesn't implement:
// TCP, RFC 793
// TCP Slow Start, Congestion Avoidance, Fast Retransmit, and Fast Recovery, RFC 2001, RFC 2581, RFC 3042
// NewReno Modification to TCP's Fast Recovery, RFC 2582
// Explicit Congestion Notification (ECN), RFC 3168
// SYN-Cache
// TCP Extensions for High Performance, RFC 1323
// SACK, Selective Acknowledgment - RFC 2018, RFC 2883
// SACK, Selective Acknowledgment - RFC 2018, RFC 2883, RFC 3517
// Forward RTO-Recovery, RFC 4138
#define TRACE_TCP
#ifdef TRACE_TCP
@ -49,10 +53,6 @@
// Estimate for Maximum segment lifetime in the internet
#define TCP_MAX_SEGMENT_LIFETIME (2 * TCP_INITIAL_RTT)
// keep maximum buffer sizes < max net_buffer size for now
#define TCP_MAX_SEND_BUF 1024
#define TCP_MAX_RECV_BUF TCP_MAX_SEND_BUF
struct tcp_segment {
struct list_link link;
@ -94,10 +94,12 @@ TCPConnection::TCPConnection(net_socket *socket)
:
fSendWindowShift(0),
fReceiveWindowShift(0),
fSendUnacknowledged(0), //system_time()),
fSendUnacknowledged(0),
fSendNext(fSendUnacknowledged),
fSendWindow(0),
fSendQueue(socket->send.buffer_size),
fInitialSendSequence(0),
fDuplicateAcknowledgeCount(0),
fRoute(NULL),
fReceiveNext(0),
fReceiveWindow(socket->receive.buffer_size),
@ -141,27 +143,6 @@ TCPConnection::InitCheck() const
}
inline bool
TCPConnection::_IsAcknowledgeValid(tcp_sequence acknowledge) const
{
return acknowledge > fSendUnacknowledged
&& acknowledge <= fSendMax;
}
/*!
If this method returns \c true, the sequence contains valid data with
regard to the receive window.
*/
inline bool
TCPConnection::_IsSequenceValid(tcp_sequence sequence, uint32 length) const
{
tcp_sequence end = tcp_sequence(sequence + length - 1);
return (sequence >= fReceiveNext && sequence < fReceiveWindow + fReceiveWindow)
|| (end >= fReceiveNext && end <= fReceiveNext + fReceiveWindow);
}
status_t
TCPConnection::Open()
{
@ -488,7 +469,7 @@ TCPConnection::UpdateTimeWait()
int32
TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
TCPConnection::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
{
// Essentially, we accept only TCP_FLAG_SYNCHRONIZE in this state,
// but the error behaviour differs
@ -523,6 +504,7 @@ TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
if (insert_connection(connection) < B_OK)
return DROP;
connection->fInitialReceiveSequence = segment.sequence;
connection->fState = SYNCHRONIZE_RECEIVED;
connection->fMaxReceiveSize = connection->fRoute->mtu - 40;
// 40 bytes for IP and TCP header without any options
@ -530,6 +512,10 @@ TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
connection->fReceiveNext = segment.sequence + 1;
// account for the extra sequence number for the synchronization
connection->fSendNext = connection->fInitialSendSequence;
connection->fSendUnacknowledged = connection->fSendNext;
connection->fSendMax = connection->fSendNext;
if (segment.max_segment_size > 0)
connection->fMaxSegmentSize = segment.max_segment_size;
@ -538,20 +524,25 @@ TCPConnection::ListenReceive(tcp_segment_header& segment, net_buffer *buffer)
TCP_FLAG_SYNCHRONIZE | TCP_FLAG_ACKNOWLEDGE, false);
//benaphore_unlock(&connection->fSendLock);
fSendQueue.SetInitialSequence(fSendNext);
connection->fInitialSendSequence = fSendNext;
connection->fSendQueue.SetInitialSequence(fSendNext);
if (status < B_OK)
return DROP;
segment.flags &= ~TCP_FLAG_SYNCHRONIZE;
// we handled this flag now, it must not be set for further processing
return connection->Receive(segment, buffer);
}
int32
TCPConnection::SynchronizeSentReceive(tcp_segment_header& segment, net_buffer *buffer)
TCPConnection::SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *buffer)
{
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& segment.acknowledge != fSendNext)
&& (fInitialSendSequence >= segment.acknowledge
|| fSendMax < segment.acknowledge))
return DROP | RESET;
if (segment.flags & TCP_FLAG_RESET) {
@ -567,6 +558,7 @@ TCPConnection::SynchronizeSentReceive(tcp_segment_header& segment, net_buffer *b
fSendUnacknowledged = segment.acknowledge + 1;
fReceiveNext = segment.sequence;
fInitialReceiveSequence = segment.sequence;
if (segment.flags & TCP_FLAG_ACKNOWLEDGE) {
// the connection has been established
@ -580,12 +572,15 @@ TCPConnection::SynchronizeSentReceive(tcp_segment_header& segment, net_buffer *b
fState = SYNCHRONIZE_RECEIVED;
}
segment.flags &= ~TCP_FLAG_SYNCHRONIZE;
// we handled this flag now, it must not be set for further processing
return Receive(segment, buffer) | IMMEDIATE_ACKNOWLEDGE;
}
int32
TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
TCPConnection::Receive(tcp_segment_header &segment, net_buffer *buffer)
{
TRACE(("TCP:%p.ReceiveData(): Connection in state %d received packet %p with flags 0x%x, seq %lu, ack %lu!\n",
this, fState, buffer, segment.flags, segment.sequence, segment.acknowledge));
@ -613,13 +608,16 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
fSendQueue.RemoveUntil(segment.acknowledge);
fSendUnacknowledged = segment.acknowledge;
// TODO: stop retransmit timer
// notify threads waiting on the socket to become writable again
release_sem_etc(fSendLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_WRITE, fSendWindow);
// if there is data left to be send, send it now
return _SendQueuedData(TCP_FLAG_ACKNOWLEDGE, false);
//return _SendQueuedData(TCP_FLAG_ACKNOWLEDGE, false);
return DROP;
}
} else if (segment.acknowledge == fSendUnacknowledged
&& fReceiveQueue.IsContiguous()
@ -655,8 +653,12 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
return DROP;
}
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) != 0
|| (fState == SYNCHRONIZE_RECEIVED && !_IsAcknowledgeValid(segment.acknowledge))) {
// this is not allowed here, so we can reset the connection
|| (fState == SYNCHRONIZE_RECEIVED
&& (fSendUnacknowledged > segment.acknowledge
|| fSendMax < segment.acknowledge
|| fInitialReceiveSequence > segment.sequence))) {
// reset the connection - either the initial SYN was faulty, or we received
// a SYN within the data stream
return DROP | RESET;
}
@ -664,7 +666,7 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
// the window must not shrink
// trim buffer to be within the receive window
int32 drop = fReceiveNext - segment.sequence;;
int32 drop = fReceiveNext - segment.sequence;
if (drop > 0) {
if ((uint32)drop > buffer->size
|| ((uint32)drop == buffer->size && (segment.flags & TCP_FLAG_FINISH) == 0)) {
@ -705,92 +707,102 @@ TCPConnection::Receive(tcp_segment_header& segment, net_buffer *buffer)
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0) {
// process acknowledged data
if (!_IsAcknowledgeValid(segment.acknowledge)) {
if (fSendUnacknowledged >= segment.acknowledge) {
}
if (fState == SYNCHRONIZE_RECEIVED) {
// TODO: window scaling!
gSocketModule->set_connected(socket);
//fReceiveQueue.SetInitialSequence(fReceiveNext);
fState = ESTABLISHED;
}
}
if ((segment.flags & TCP_FLAG_RESET) != 0)
return DROP;
status_t status;
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& _IsAcknowledgeValid(segment.acknowledge)) {
fSendUnacknowledged = segment.acknowledge;
if (fSendUnacknowledged == fSendNext) {
// there is no data waiting to be processed
if (fState == WAIT_FOR_FINISH_ACKNOWLEDGE) {
fState = CLOSED;
status = remove_connection(this);
if (status != B_OK)
return DROP;
} else if (fState == CLOSING) {
fState = TIME_WAIT;
status = remove_connection(this);
if (status != B_OK)
return DROP;
} else if (fState == FINISH_SENT)
fState = FINISH_ACKNOWLEDGED;
else if (fState == SYNCHRONIZE_RECEIVED) {
fState = ESTABLISHED;
fReceiveQueue.SetInitialSequence(fReceiveNext);
if (fSendMax < segment.acknowledge)
return DROP | IMMEDIATE_ACKNOWLEDGE;
if (fSendUnacknowledged >= segment.acknowledge) {
// this is a duplicate acknowledge
// TODO: handle this!
} else {
// this segment acknowleges in flight data
fDuplicateAcknowledgeCount = 0;
if (fSendMax == segment.acknowledge) {
// there is no outstanding data to be acknowledged
// TODO: stop retransmit timer
} else {
// TODO: set retransmit timer
}
}
fSendWindow = advertisedWindow;
fSendUnacknowledged = segment.acknowledge;
fSendQueue.RemoveUntil(segment.acknowledge);
if (segment.flags & TCP_FLAG_FINISH) {
dprintf("peer is finishing connection!");
fReceiveNext++;
if (fSendNext < fSendUnacknowledged)
fSendNext = fSendUnacknowledged;
// other side is closing connection; change states
switch (fState) {
case ESTABLISHED:
fState = FINISH_RECEIVED;
action |= IMMEDIATE_ACKNOWLEDGE;
break;
case FINISH_ACKNOWLEDGED:
fState = TIME_WAIT;
break;
case FINISH_RECEIVED:
if (fSendUnacknowledged == fSendNext) {
if (segment.acknowledge > fSendQueue.LastSequence()) {
// our TCP_FLAG_FINISH has been acknowledged
switch (fState) {
case FINISH_SENT:
fState = FINISH_ACKNOWLEDGED;
break;
case CLOSING:
fState = TIME_WAIT;
status = remove_connection(this);
if (status != B_OK)
return DROP;
break;
case WAIT_FOR_FINISH_ACKNOWLEDGE:
fState = CLOSED;
break;
//gStackModule->set_timer(&fTimer, TCP_MAX_SEGMENT_LIFETIME);
} else
fState = CLOSING;
break;
default:
break;
default:
break;
}
}
}
if (buffer->size > 0 || (segment.flags & (TCP_FLAG_SYNCHRONIZE | TCP_FLAG_FINISH)) != 0)
action |= ACKNOWLEDGE;
} else {
// out-of-order packet received, remind the other side of where we are
return DROP | ACKNOWLEDGE;
}
TRACE(("TCP %p.Receive():Entering state %d\n", this, fState));
// TODO: update window
fSendWindow = advertisedWindow;
// TODO: process urgent data!
// TODO: ignore data *after* FIN
if (segment.flags & TCP_FLAG_FINISH) {
dprintf("peer is finishing connection!");
fReceiveNext++;
// other side is closing connection; change states
switch (fState) {
case ESTABLISHED:
case SYNCHRONIZE_RECEIVED:
fState = FINISH_RECEIVED;
action |= IMMEDIATE_ACKNOWLEDGE;
break;
case FINISH_ACKNOWLEDGED:
fState = TIME_WAIT;
break;
case FINISH_RECEIVED:
// a second FIN?
break;
case FINISH_SENT:
// simultaneous close
fState = CLOSING;
break;
default:
break;
}
}
if (buffer->size > 0 || (segment.flags & (TCP_FLAG_SYNCHRONIZE | TCP_FLAG_FINISH)) != 0)
action |= ACKNOWLEDGE;
TRACE(("TCP %p.Receive():Entering state %d, segment action %ld\n", this, fState, action));
// state machine is done switching states and the data is good.
// put it in the receive buffer
if (buffer->size > 0)
if (buffer->size > 0) {
fReceiveNext += buffer->size;
fReceiveQueue.Add(buffer, segment.sequence);
else
} else
gBufferModule->free(buffer);
fReceiveNext = fReceiveQueue.NextSequence();
fReceiveWindow = fReceiveQueue.Free();
return action;
}
@ -828,7 +840,7 @@ TCPConnection::_SendQueuedData(uint16 flags, bool empty)
uint32 effectiveWindow = min_c(next->module->get_mtu(next,
(sockaddr *)&socket->address), fSendWindow);
uint32 available = fSendQueue.Available(fSendNext);
dprintf("fSendWindow = %lu, available = %lu\n", fSendWindow, available);
dprintf("fSendWindow = %lu, available = %lu, fSendNext = %lu\n", fSendWindow, available, (uint32)fSendNext);
if (effectiveWindow > available)
effectiveWindow = available;
@ -854,8 +866,8 @@ dprintf("fSendWindow = %lu, available = %lu\n", fSendWindow, available);
gAddressModule->set_to((sockaddr *)&buffer->destination, (sockaddr *)&socket->peer);
TRACE(("TCP:%p.SendQueuedData() buffer %p, from address %s to %s\n", this,
buffer,
AddressString(gDomain, (sockaddr *)&buffer->destination, true).Data(),
AddressString(gDomain, (sockaddr *)&buffer->source, true).Data()));
AddressString(gDomain, (sockaddr *)&buffer->source, true).Data(),
AddressString(gDomain, (sockaddr *)&buffer->destination, true).Data()));
uint32 size = buffer->size;
@ -881,13 +893,15 @@ dprintf("fSendWindow = %lu, available = %lu\n", fSendWindow, available);
// Only count 1 SYN, the 1 sent when transitioning from CLOSED or LISTEN
if ((flags & TCP_FLAG_SYNCHRONIZE) != 0)
fSendNext++;
size++;
// Only count 1 FIN, the 1 sent when transitioning from
// ESTABLISHED, SYNCHRONIZE_RECEIVED or FINISH_RECEIVED
if ((flags & TCP_FLAG_FINISH) != 0)
fSendNext++;
size++;
if (fSendMax == fSendNext)
fSendMax += size;
fSendNext += size;
#if 0

View File

@ -57,8 +57,6 @@ class TCPConnection : public net_protocol {
static int32 HashOffset() { return offsetof(TCPConnection, fHashNext); }
private:
bool _IsAcknowledgeValid(tcp_sequence acknowledge) const;
bool _IsSequenceValid(tcp_sequence sequence, uint32 length) const;
status_t _SendQueuedData(uint16 flags, bool empty);
static void _TimeWait(struct net_timer *timer, void *data);
@ -79,6 +77,8 @@ class TCPConnection : public net_protocol {
uint32 fMaxSegmentSize;
BufferQueue fSendQueue;
tcp_sequence fLastAcknowledgeSent;
tcp_sequence fInitialSendSequence;
uint32 fDuplicateAcknowledgeCount;
net_route *fRoute;
// TODO: don't use a net_route, but a net_route_info!!!
@ -87,6 +87,7 @@ class TCPConnection : public net_protocol {
uint32 fReceiveWindow;
uint32 fMaxReceiveSize;
BufferQueue fReceiveQueue;
tcp_sequence fInitialReceiveSequence;
// round trip time and retransmit timeout computation
int32 fRoundTripTime;

View File

@ -269,7 +269,7 @@ reply_with_reset(tcp_segment_header &segment, net_buffer *buffer)
} else
outSegment.sequence = segment.acknowledge;
status_t status = add_tcp_header(segment, reply);
status_t status = add_tcp_header(outSegment, reply);
if (status == B_OK)
status = gDomain->module->send_data(NULL, reply);
@ -566,8 +566,9 @@ tcp_receive_data(net_buffer *buffer)
(struct sockaddr *)&buffer->source);
if (connection != NULL) {
switch (connection->State()) {
case CLOSED:
case TIME_WAIT:
segmentAction |= IMMEDIATE_ACKNOWLEDGE;
case CLOSED:
connection->UpdateTimeWait();
break;
@ -600,8 +601,6 @@ tcp_receive_data(net_buffer *buffer)
if (segmentAction & RESET) {
// send reset
TRACE(("TCP: Connection does not exist!\n"));
reply_with_reset(segment, buffer);
}
if (segmentAction & DROP)

View File

@ -97,8 +97,8 @@ class tcp_sequence {
#define TCP_FLAG_PUSH 0x08
#define TCP_FLAG_ACKNOWLEDGE 0x10
#define TCP_FLAG_URGENT 0x20
#define TCP_FLAG_ECN 0x40 // Explicit Congestion Notification echo
#define TCP_FLAG_CWR 0x80 // Congestion Window Reduced
#define TCP_FLAG_CONGESTION_NOTIFICATION_ECHO 0x40
#define TCP_FLAG_CONGESTION_WINDOW_REDUCED 0x80
#define TCP_CONNECTION_TIMEOUT 75000000 // 75 secs