* TIME_WAIT endpoints must not send an acknowledgement for known data; this

fixes the "endless discussions" when closing a local connection - only
  happened on a simultaneous close.
* A FIN in TIME_WAIT now updates the time-wait timer, as required by the
  TCP specification.
* Entering TIME_WAIT now cancels all connection timers. We might want to
  think about putting time wait connections into a separate hash, and delete
  the socket early on.
* Added tracing support for send/receive, timers, and state changes.
* Cleanup.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@25236 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2008-04-29 07:39:58 +00:00
parent 5abeea69a5
commit 0cadc931d2
2 changed files with 236 additions and 79 deletions

View File

@ -10,7 +10,16 @@
#include "TCPEndpoint.h"
#include "EndpointManager.h"
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <new>
#include <stdlib.h>
#include <string.h>
#include <KernelExport.h>
#include <Select.h>
#include <net_buffer.h>
#include <net_datalink.h>
@ -19,19 +28,12 @@
#include <NetUtilities.h>
#include <lock.h>
#include <tracing.h>
#include <util/AutoLock.h>
#include <util/khash.h>
#include <util/list.h>
#include <KernelExport.h>
#include <Select.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <new>
#include <stdlib.h>
#include <string.h>
#include "EndpointManager.h"
// References:
@ -53,7 +55,7 @@
#define PrintAddress(address) \
AddressString(Domain(), address, true).Data()
//#define TRACE_TCP
#define TRACE_TCP
//#define PROBE_TCP
#ifdef TRACE_TCP
@ -77,6 +79,133 @@
# define PROBE(buffer, window) do { } while (0)
#endif
#if TCP_TRACING
namespace TCPTracing {
class Receive : public AbstractTraceEntry {
public:
Receive(TCPEndpoint* endpoint, tcp_segment_header& segment, uint32 window,
net_buffer* buffer)
:
fEndpoint(endpoint),
fBuffer(buffer),
fBufferSize(buffer->size),
fSequence(segment.sequence),
fAcknowledge(segment.acknowledge),
fWindow(window),
fState(endpoint->State()),
fFlags(segment.flags)
{
Initialized();
}
virtual void AddDump(TraceOutput& out)
{
out.Print("tcp:%p (%12s) receive buffer %p (%lu bytes), flags %x, "
"seq %lu, ack %lu, wnd %lu", fEndpoint, name_for_state(fState),
fBuffer, fBufferSize, fFlags, fSequence, fAcknowledge, fWindow);
}
protected:
TCPEndpoint* fEndpoint;
net_buffer* fBuffer;
uint32 fBufferSize;
uint32 fSequence;
uint32 fAcknowledge;
uint32 fWindow;
tcp_state fState;
uint8 fFlags;
};
class Send : public AbstractTraceEntry {
public:
Send(TCPEndpoint* endpoint, tcp_segment_header& segment, net_buffer* buffer,
uint32 firstSequence, uint32 lastSequence)
:
fEndpoint(endpoint),
fBuffer(buffer),
fBufferSize(buffer->size),
fSequence(segment.sequence),
fAcknowledge(segment.acknowledge),
fFirstSequence(firstSequence),
fLastSequence(lastSequence),
fState(endpoint->State()),
fFlags(segment.flags)
{
Initialized();
}
virtual void AddDump(TraceOutput& out)
{
out.Print("tcp:%p (%12s) send buffer %p (%lu bytes), flags %x, "
"seq %lu, ack %lu, first %lu, last %lu",
fEndpoint, name_for_state(fState), fBuffer, fBufferSize, fFlags,
fSequence, fAcknowledge, fFirstSequence, fLastSequence);
}
protected:
TCPEndpoint* fEndpoint;
net_buffer* fBuffer;
uint32 fBufferSize;
uint32 fSequence;
uint32 fAcknowledge;
uint32 fFirstSequence;
uint32 fLastSequence;
tcp_state fState;
uint8 fFlags;
};
class State : public AbstractTraceEntry {
public:
State(TCPEndpoint* endpoint)
:
fEndpoint(endpoint),
fState(endpoint->State())
{
Initialized();
}
virtual void AddDump(TraceOutput& out)
{
out.Print("tcp:%p (%12s) state change", fEndpoint,
name_for_state(fState));
}
protected:
TCPEndpoint* fEndpoint;
tcp_state fState;
};
class Timer : public AbstractTraceEntry {
public:
Timer(TCPEndpoint* endpoint, const char* which)
:
fEndpoint(endpoint),
fWhich(which),
fState(endpoint->State())
{
Initialized();
}
virtual void AddDump(TraceOutput& out)
{
out.Print("tcp:%p (%12s) %s timer", fEndpoint,
name_for_state(fState), fWhich);
}
protected:
TCPEndpoint* fEndpoint;
const char* fWhich;
tcp_state fState;
};
} // namespace TCPTracing
# define T(x) new(std::nothrow) TCPTracing::x
#else
# define T(x)
#endif // TCP_TRACING
// Initial estimate for packet round trip time (RTT)
#define TCP_INITIAL_RTT 2000000
@ -343,6 +472,7 @@ TCPEndpoint::Close()
if (fState == SYNCHRONIZE_SENT || fState == LISTEN) {
// TODO: what about linger in case of SYNCHRONIZE_SENT?
fState = CLOSED;
T(State(this));
return B_OK;
}
@ -434,11 +564,13 @@ TCPEndpoint::Connect(const sockaddr* address)
TRACE(" Connect(): starting 3-way handshake...");
fState = SYNCHRONIZE_SENT;
T(State(this));
// send SYN
status = _SendQueued();
if (status != B_OK) {
fState = CLOSED;
T(State(this));
return status;
}
@ -550,6 +682,7 @@ TCPEndpoint::Listen(int count)
}
fState = LISTEN;
T(State(this));
return B_OK;
}
@ -837,20 +970,21 @@ TCPEndpoint::_EnterTimeWait()
{
TRACE("_EnterTimeWait()\n");
_CancelConnectionTimers();
#if 0
if (fState == TIME_WAIT && fRoute != NULL
&& (fRoute->flags & RTF_LOCAL) != 0)
return;
#endif
gStackModule->set_timer(&fTimeWaitTimer, TCP_MAX_SEGMENT_LIFETIME << 1);
_UpdateTimeWait();
}
status_t
void
TCPEndpoint::_UpdateTimeWait()
{
return B_OK;
gStackModule->set_timer(&fTimeWaitTimer, TCP_MAX_SEGMENT_LIFETIME << 1);
}
@ -878,9 +1012,12 @@ TCPEndpoint::_Disconnect(bool closing)
else
return B_OK;
T(State(this));
status_t status = _SendQueued();
if (status != B_OK) {
fState = previousState;
T(State(this));
return status;
}
@ -892,6 +1029,7 @@ void
TCPEndpoint::_MarkEstablished()
{
fState = ESTABLISHED;
T(State(this));
if (socket->parent != NULL) {
gSocketModule->set_connected(socket);
@ -928,6 +1066,7 @@ TCPEndpoint::_HandleReset(status_t error)
socket->error = error;
fState = CLOSED;
T(State(this));
fSendList.Signal();
_NotifyReader();
@ -997,6 +1136,57 @@ TCPEndpoint::_NotifyReader()
}
bool
TCPEndpoint::_AddData(tcp_segment_header& segment, net_buffer* buffer)
{
fReceiveQueue.Add(buffer, segment.sequence);
fReceiveNext = fReceiveQueue.NextSequence();
TRACE(" _AddData(): adding data, receive next = %lu. Now have %lu bytes.",
(uint32)fReceiveNext, fReceiveQueue.Available());
if (segment.flags & TCP_FLAG_PUSH)
fReceiveQueue.SetPushPointer();
return fReceiveQueue.Available() > 0;
}
void
TCPEndpoint::_PrepareReceivePath(tcp_segment_header& segment)
{
fInitialReceiveSequence = segment.sequence;
// count the received SYN
segment.sequence++;
fReceiveNext = segment.sequence;
fReceiveQueue.SetInitialSequence(segment.sequence);
if ((fOptions & TCP_NOOPT) == 0) {
if (segment.max_segment_size > 0)
fSendMaxSegmentSize = segment.max_segment_size;
if (segment.options & TCP_HAS_WINDOW_SCALE) {
fFlags |= FLAG_OPTION_WINDOW_SCALE;
fSendWindowShift = segment.window_shift;
} else {
fFlags &= ~FLAG_OPTION_WINDOW_SCALE;
fReceiveWindowShift = 0;
}
if (segment.options & TCP_HAS_TIMESTAMPS) {
fFlags |= FLAG_OPTION_TIMESTAMP;
fReceivedTimestamp = segment.timestamp_value;
} else
fFlags &= ~FLAG_OPTION_TIMESTAMP;
}
fCongestionWindow = 2 * fSendMaxSegmentSize;
fSlowStartThreshold = (uint32)segment.advertised_window << fSendWindowShift;
}
bool
TCPEndpoint::_ShouldReceive() const
{
@ -1018,6 +1208,7 @@ TCPEndpoint::_Spawn(TCPEndpoint* parent, tcp_segment_header& segment,
ProtocolSocket::Open();
fState = SYNCHRONIZE_RECEIVED;
T(State(this));
fManager = parent->fManager;
LocalAddress().SetTo(buffer->destination);
@ -1101,6 +1292,7 @@ TCPEndpoint::_SynchronizeSentReceive(tcp_segment_header &segment,
} else {
// simultaneous open
fState = SYNCHRONIZE_RECEIVED;
T(State(this));
}
segment.flags &= ~TCP_FLAG_SYNCHRONIZE;
@ -1260,7 +1452,7 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
if (fState == SYNCHRONIZE_RECEIVED)
_MarkEstablished();
if (fSendMax < segment.acknowledge || fState == TIME_WAIT)
if (fSendMax < segment.acknowledge)
return DROP | IMMEDIATE_ACKNOWLEDGE;
if (segment.acknowledge < fSendUnacknowledged) {
@ -1295,6 +1487,7 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
break;
case CLOSING:
fState = TIME_WAIT;
T(State(this));
_EnterTimeWait();
return DROP;
case WAIT_FOR_FINISH_ACKNOWLEDGE:
@ -1305,6 +1498,8 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
default:
break;
}
T(State(this));
}
if (fState != CLOSED)
@ -1338,7 +1533,7 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
// FIN implies PSH
fReceiveQueue.SetPushPointer();
// we'll reply immediatly to the FIN if we are not
// we'll reply immediately to the FIN if we are not
// transitioning to TIME WAIT so we immediatly ACK it.
action |= IMMEDIATE_ACKNOWLEDGE;
@ -1347,15 +1542,22 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
case ESTABLISHED:
case SYNCHRONIZE_RECEIVED:
fState = FINISH_RECEIVED;
T(State(this));
break;
case FINISH_SENT:
// simultaneous close
fState = CLOSING;
T(State(this));
break;
case FINISH_ACKNOWLEDGED:
fState = TIME_WAIT;
T(State(this));
_EnterTimeWait();
break;
case TIME_WAIT:
_UpdateTimeWait();
break;
default:
break;
}
@ -1381,13 +1583,14 @@ TCPEndpoint::SegmentReceived(tcp_segment_header& segment, net_buffer* buffer)
{
MutexLocker locker(fLock);
TRACE("SegmentReceived(): buffer %p (%lu bytes) address %s to %s",
TRACE("SegmentReceived(): buffer %p (%lu bytes) address %s to %s\n"
"\tflags 0x%x, seq %lu, ack %lu, wnd %lu",
buffer, buffer->size, PrintAddress(buffer->source),
PrintAddress(buffer->destination));
TRACE(" flags 0x%x, seq %lu, ack %lu, wnd %lu",
segment.flags, segment.sequence, segment.acknowledge,
PrintAddress(buffer->destination), segment.flags, segment.sequence,
segment.acknowledge,
(uint32)segment.advertised_window << fSendWindowShift);
T(Receive(this, segment,
(uint32)segment.advertised_window << fSendWindowShift, buffer));
int32 segmentAction = DROP;
switch (fState) {
@ -1619,16 +1822,17 @@ TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
uint32 size = buffer->size;
segment.sequence = fSendNext;
TRACE("SendQueued(): buffer %p (%lu bytes) address %s to %s",
TRACE("SendQueued(): buffer %p (%lu bytes) address %s to %s\n"
"\tflags 0x%x, seq %lu, ack %lu, rwnd %hu, cwnd %lu, ssthresh %lu\n"
"\tlen %lu first %lu last %lu",
buffer, buffer->size, PrintAddress(buffer->source),
PrintAddress(buffer->destination));
TRACE(" flags 0x%x, seq %lu, ack %lu, rwnd %hu, cwnd %lu"
", ssthresh %lu", segment.flags, segment.sequence,
PrintAddress(buffer->destination), segment.flags, segment.sequence,
segment.acknowledge, segment.advertised_window,
fCongestionWindow, fSlowStartThreshold);
TRACE(" len %lu first %lu last %lu", segmentLength,
fCongestionWindow, fSlowStartThreshold, segmentLength,
(uint32)fSendQueue.FirstSequence(),
(uint32)fSendQueue.LastSequence());
T(Send(this, segment, buffer, fSendQueue.FirstSequence(),
fSendQueue.LastSequence()));
PROBE(buffer, sendWindow);
sendWindow -= buffer->size;
@ -1698,57 +1902,6 @@ TCPEndpoint::_MaxSegmentSize(const sockaddr* address) const
}
bool
TCPEndpoint::_AddData(tcp_segment_header& segment, net_buffer* buffer)
{
fReceiveQueue.Add(buffer, segment.sequence);
fReceiveNext = fReceiveQueue.NextSequence();
TRACE(" _AddData(): adding data, receive next = %lu. Now have %lu bytes.",
(uint32)fReceiveNext, fReceiveQueue.Available());
if (segment.flags & TCP_FLAG_PUSH)
fReceiveQueue.SetPushPointer();
return fReceiveQueue.Available() > 0;
}
void
TCPEndpoint::_PrepareReceivePath(tcp_segment_header& segment)
{
fInitialReceiveSequence = segment.sequence;
// count the received SYN
segment.sequence++;
fReceiveNext = segment.sequence;
fReceiveQueue.SetInitialSequence(segment.sequence);
if ((fOptions & TCP_NOOPT) == 0) {
if (segment.max_segment_size > 0)
fSendMaxSegmentSize = segment.max_segment_size;
if (segment.options & TCP_HAS_WINDOW_SCALE) {
fFlags |= FLAG_OPTION_WINDOW_SCALE;
fSendWindowShift = segment.window_shift;
} else {
fFlags &= ~FLAG_OPTION_WINDOW_SCALE;
fReceiveWindowShift = 0;
}
if (segment.options & TCP_HAS_TIMESTAMPS) {
fFlags |= FLAG_OPTION_TIMESTAMP;
fReceivedTimestamp = segment.timestamp_value;
} else
fFlags &= ~FLAG_OPTION_TIMESTAMP;
}
fCongestionWindow = 2 * fSendMaxSegmentSize;
fSlowStartThreshold = (uint32)segment.advertised_window << fSendWindowShift;
}
status_t
TCPEndpoint::_PrepareSendPath(const sockaddr* peer)
{
@ -1883,6 +2036,7 @@ TCPEndpoint::_ResetSlowStart()
TCPEndpoint::_RetransmitTimer(net_timer* timer, void* _endpoint)
{
TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint;
T(Timer(endpoint, "retransmit"));
MutexLocker locker(endpoint->fLock);
if (!locker.IsLocked())
@ -1896,6 +2050,7 @@ TCPEndpoint::_RetransmitTimer(net_timer* timer, void* _endpoint)
TCPEndpoint::_PersistTimer(net_timer* timer, void* _endpoint)
{
TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint;
T(Timer(endpoint, "persist"));
MutexLocker locker(endpoint->fLock);
if (!locker.IsLocked())
@ -1913,6 +2068,7 @@ TCPEndpoint::_PersistTimer(net_timer* timer, void* _endpoint)
TCPEndpoint::_DelayedAcknowledgeTimer(net_timer* timer, void* _endpoint)
{
TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint;
T(Timer(endpoint, "delayed ack"));
MutexLocker locker(endpoint->fLock);
if (!locker.IsLocked())
@ -1930,6 +2086,7 @@ TCPEndpoint::_DelayedAcknowledgeTimer(net_timer* timer, void* _endpoint)
TCPEndpoint::_TimeWaitTimer(net_timer* timer, void* _endpoint)
{
TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint;
T(Timer(endpoint, "time-wait"));
gSocketModule->delete_socket(endpoint->socket);
}

View File

@ -84,7 +84,7 @@ public:
private:
void _StartPersistTimer();
void _EnterTimeWait();
status_t _UpdateTimeWait();
void _UpdateTimeWait();
void _CancelConnectionTimers();
uint8 _CurrentFlags();
bool _ShouldSendSegment(tcp_segment_header& segment,