diff --git a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp index c399a60e8a..81447e2bb1 100644 --- a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp +++ b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp @@ -116,6 +116,14 @@ segment_in_sequence(const tcp_segment_header &segment, int size, } +static inline bool +is_writable(tcp_state state) +{ + return state == SYNCHRONIZE_SENT || state == SYNCHRONIZE_RECEIVED + || state == ESTABLISHED || state == FINISH_RECEIVED; +} + + WaitList::WaitList(const char *name) { fSem = create_sem(0, name); @@ -180,6 +188,8 @@ TCPEndpoint::TCPEndpoint(net_socket *socket) fReceiveQueue(socket->receive.buffer_size), fRoundTripTime(TCP_INITIAL_RTT), fReceivedTSval(0), + fCongestionWindow(0), + fSlowStartThreshold(0), fState(CLOSED), fFlags(FLAG_OPTION_WINDOW_SCALE | FLAG_OPTION_TIMESTAMP), fError(B_OK), @@ -479,11 +489,11 @@ TCPEndpoint::Shutdown(int direction) status_t TCPEndpoint::SendData(net_buffer *buffer) { - TRACE("SendData(buffer %p, size %lu, flags %lx)", - buffer, buffer->size, buffer->flags); - RecursiveLocker lock(fLock); + TRACE("SendData(buffer %p, size %lu, flags %lx) [total %lu bytes]", + buffer, buffer->size, buffer->flags, fSendQueue.Size()); + if (fState == CLOSED) return ENOTCONN; else if (fState == LISTEN) { @@ -510,6 +520,8 @@ TCPEndpoint::SendData(net_buffer *buffer) fSendQueue.Add(buffer); } + TRACE(" SendData(): %lu bytes used.", fSendQueue.Used()); + if (fState == ESTABLISHED || fState == FINISH_RECEIVED) _SendQueued(); @@ -524,12 +536,10 @@ TCPEndpoint::SendAvailable() ssize_t available; - if (fState == FINISH_SENT || fState == FINISH_ACKNOWLEDGED - || fState == CLOSING || fState == WAIT_FOR_FINISH_ACKNOWLEDGE - || fState == TIME_WAIT || fState == LISTEN || fState == CLOSED) - available = EPIPE; - else + if (is_writable(fState)) available = fSendQueue.Free(); + else + available = EPIPE; TRACE("SendAvailable(): %li", available); return available; @@ -603,8 +613,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer) } } - TRACE(" ReadData(): read %lu bytes, %lu are available.", - numBytes, fReceiveQueue.Available()); + TRACE(" ReadData(): %lu are available.", fReceiveQueue.Available()); if (numBytes < fReceiveQueue.Available()) fReceiveList.Signal(); @@ -745,7 +754,10 @@ TCPEndpoint::Spawn(TCPEndpoint *parent, tcp_segment_header &segment, if (_PrepareSendPath((sockaddr *)&socket->peer) < B_OK) return DROP; - _PrepareReceivePath(parent, segment); + fOptions = parent->fOptions; + fAcceptSemaphore = parent->fAcceptSemaphore; + + _PrepareReceivePath(segment); // send SYN+ACK if (_SendQueued() < B_OK) @@ -777,7 +789,8 @@ TCPEndpoint::_SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *bu if ((segment.flags & TCP_FLAG_SYNCHRONIZE) == 0) return DROP; - _PrepareReceivePath(NULL, segment); + fSendUnacknowledged = segment.acknowledge; + _PrepareReceivePath(segment); if (segment.flags & TCP_FLAG_ACKNOWLEDGE) { _MarkEstablished(); @@ -859,21 +872,11 @@ TCPEndpoint::_SegmentReceived(tcp_segment_header &segment, net_buffer *buffer) TRACE("Receive(): header prediction send!"); // and it only acknowledges outstanding data - // TODO: update RTT estimators - - fSendQueue.RemoveUntil(segment.acknowledge); - fSendUnacknowledged = segment.acknowledge; + _Acknowledged(segment.acknowledge); // stop retransmit timer gStackModule->cancel_timer(&fRetransmitTimer); - // notify threads waiting on the socket to become writable again - fSendList.Signal(); - // TODO: real conditional locking needed! - gSocketModule->notify(socket, B_SELECT_WRITE, fSendWindow); - - // if there is data left to be send, send it now - _SendQueued(); return DROP; } } else if (segment.acknowledge == fSendUnacknowledged @@ -1017,7 +1020,12 @@ TCPEndpoint::_SendQueued(bool force) sendWindow = 1; } - int32 length = min_c(available, sendWindow) - (fSendNext - fSendUnacknowledged); + if (fCongestionWindow > 0) + sendWindow = min_c(sendWindow, fCongestionWindow); + + int32 length = min_c(available, sendWindow) + - (fSendNext - fSendUnacknowledged); + if (length < 0) { // either the window shrank, or we sent a still unacknowledged FIN length = 0; @@ -1091,9 +1099,10 @@ TCPEndpoint::_SendQueued(bool force) buffer, buffer->size, AddressString(Domain(), (sockaddr *)&buffer->source, true).Data(), AddressString(Domain(), (sockaddr *)&buffer->destination, true).Data()); - TRACE(" flags 0x%x, seq %lu, ack %lu, rwnd %hu", - segment.flags, segment.sequence, segment.acknowledge, - segment.advertised_window); + TRACE(" flags 0x%x, seq %lu, ack %lu, rwnd %hu, cwnd %lu" + ", ssthresh %lu", segment.flags, segment.sequence, + segment.acknowledge, segment.advertised_window, + fCongestionWindow, fSlowStartThreshold); status = add_tcp_header(AddressModule(), segment, buffer); if (status != B_OK) { @@ -1342,11 +1351,7 @@ TCPEndpoint::_Receive(tcp_segment_header &segment, net_buffer *buffer) gStackModule->set_timer(&fRetransmitTimer, 1000000LL); } - fSendUnacknowledged = segment.acknowledge; - fSendQueue.RemoveUntil(segment.acknowledge); - - if (fSendNext < fSendUnacknowledged) - fSendNext = fSendUnacknowledged; + _Acknowledged(segment.acknowledge); if (segment.acknowledge > fSendQueue.LastSequence() && fState > ESTABLISHED) { @@ -1496,24 +1501,16 @@ TCPEndpoint::_AddData(tcp_segment_header &segment, net_buffer *buffer) void -TCPEndpoint::_PrepareReceivePath(TCPEndpoint *parent, - tcp_segment_header &segment) +TCPEndpoint::_PrepareReceivePath(tcp_segment_header &segment) { fInitialReceiveSequence = segment.sequence; // count the received SYN segment.sequence++; - if (parent == NULL) - fSendUnacknowledged = segment.acknowledge; fReceiveNext = segment.sequence; fReceiveQueue.SetInitialSequence(segment.sequence); - if (parent) { - fOptions = parent->fOptions; - fAcceptSemaphore = parent->fAcceptSemaphore; - } - if ((fOptions & TCP_NOOPT) == 0) { if (segment.max_segment_size > 0) fSendMaxSegmentSize = segment.max_segment_size; @@ -1531,6 +1528,9 @@ TCPEndpoint::_PrepareReceivePath(TCPEndpoint *parent, else fFlags &= ~FLAG_OPTION_TIMESTAMP; } + + fCongestionWindow = 2 * fSendMaxSegmentSize; + fSlowStartThreshold = (uint32)segment.advertised_window << fSendWindowShift; } @@ -1571,6 +1571,61 @@ TCPEndpoint::_PrepareSendPath(const sockaddr *peer) } +void +TCPEndpoint::_Acknowledged(tcp_sequence acknowledge) +{ + size_t previouslyUsed = fSendQueue.Used(); + + fSendQueue.RemoveUntil(acknowledge); + fSendUnacknowledged = acknowledge; + + if (fSendNext < fSendUnacknowledged) + fSendNext = fSendUnacknowledged; + + // TODO: update RTT estimators + + if (fSendQueue.Used() < previouslyUsed) { + // this ACK acknowledged data + + if (is_writable(fState)) { + // notify threads waiting on the socket to become writable again + fSendList.Signal(); + gSocketModule->notify(socket, B_SELECT_WRITE, fSendQueue.Used()); + } + + if (fCongestionWindow < fSlowStartThreshold) + fCongestionWindow += fSendMaxSegmentSize; + } + + if (fCongestionWindow >= fSlowStartThreshold) { + uint32 increment = fSendMaxSegmentSize * fSendMaxSegmentSize; + + if (increment < fCongestionWindow) + increment = 1; + else + increment /= fCongestionWindow; + + fCongestionWindow += increment; + } + + // if there is data left to be send, send it now + _SendQueued(); +} + + +void +TCPEndpoint::_Retransmit() +{ + fSendNext = fSendUnacknowledged; + _SendQueued(); + fSendNext = fSendMax; + + fSlowStartThreshold = max_c((fSendMax - fSendUnacknowledged) / 2, + 2 * fSendMaxSegmentSize); + fCongestionWindow = fSendMaxSegmentSize; +} + + // #pragma mark - timer @@ -1583,9 +1638,7 @@ TCPEndpoint::_RetransmitTimer(net_timer *timer, void *data) if (!locker.IsLocked()) return; - endpoint->fSendNext = endpoint->fSendUnacknowledged; - endpoint->_SendQueued(); - endpoint->fSendNext = endpoint->fSendMax; + endpoint->_Retransmit(); } diff --git a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h index 699a32e8d0..9f1b1a0f92 100644 --- a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h +++ b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h @@ -102,9 +102,10 @@ class TCPEndpoint : public net_protocol { void _MarkEstablished(); status_t _WaitForEstablished(RecursiveLocker &lock, bigtime_t timeout); void _AddData(tcp_segment_header &segment, net_buffer *buffer); - void _PrepareReceivePath(TCPEndpoint *parent, - tcp_segment_header &segment); + void _PrepareReceivePath(tcp_segment_header &segment); status_t _PrepareSendPath(const sockaddr *peer); + void _Acknowledged(tcp_sequence acknowledge); + void _Retransmit(); static void _TimeWaitTimer(net_timer *timer, void *data); static void _RetransmitTimer(net_timer *timer, void *data);