TCP: Refactor sending logic.

* Break segment setup out into its own method.

 * Break the actual sending logic out into its own method.
  - While at it, remove some old/obsolete comments and
    rearrange some of the logic to match.

 * Separate the send-pure-ACK and send-data methods.
  - This way, the "force" parameters will act differently,
    specifying "force" to SendAcknowledge() may generate
    a duplicate ACK, while to SendQueued() it will either
    send data smaller than a segment size, or do nothing.

Functional changes should be minor, and the code
meanwhile should be much easier to read.

Change-Id: I1e14b9a1e3b7c8b2d3bf8ae30f1369d8c9f662a4
Reviewed-on: https://review.haiku-os.org/c/haiku/+/7361
Reviewed-by: waddlesplash <waddlesplash@gmail.com>
This commit is contained in:
Augustin Cavalier 2024-01-29 21:48:26 -05:00 committed by waddlesplash
parent d2f65e76ee
commit 4fec81750d
3 changed files with 193 additions and 170 deletions

View File

@ -649,7 +649,7 @@ TCPEndpoint::Connect(const sockaddr* address)
T(State(this));
// send SYN
status = _SendQueued();
status = _SendAcknowledge();
if (status != B_OK) {
_Close();
return status;
@ -838,8 +838,7 @@ TCPEndpoint::SendData(net_buffer *buffer)
while (left > 0) {
while (fSendQueue.Free() < socket->send.low_water_mark) {
// initiate a send before waiting
if (fState == ESTABLISHED || fState == FINISH_RECEIVED)
_SendQueued();
_SendQueued();
// wait until enough space is available
status_t status = _WaitForCondition(fSendCondition, lock, timeout);
@ -890,8 +889,7 @@ TCPEndpoint::SendData(net_buffer *buffer)
if ((flags & MSG_EOF) != 0)
_Disconnect(false);
if (fState == ESTABLISHED || fState == FINISH_RECEIVED)
_SendQueued(force);
_SendQueued(force);
return B_OK;
}
@ -1035,7 +1033,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
// if we are opening the window, check if we should send an ACK
if (!clone)
SendAcknowledge(false);
_SendAcknowledge();
return receivedBytes;
}
@ -1156,13 +1154,6 @@ TCPEndpoint::DelayedAcknowledge()
}
status_t
TCPEndpoint::SendAcknowledge(bool force)
{
return _SendQueued(force, 0);
}
void
TCPEndpoint::_StartPersistTimer()
{
@ -1509,7 +1500,7 @@ TCPEndpoint::_Spawn(TCPEndpoint* parent, tcp_segment_header& segment,
_PrepareReceivePath(segment);
// send SYN+ACK
if (_SendQueued() != B_OK) {
if (_SendAcknowledge() != B_OK) {
T(Error(this, "sending failed", __LINE__));
return DROP;
}
@ -1598,8 +1589,7 @@ TCPEndpoint::_Receive(tcp_segment_header& segment, net_buffer* buffer)
return DROP | IMMEDIATE_ACKNOWLEDGE;
}
uint32 advertisedWindow = (uint32)segment.advertised_window
<< fSendWindowShift;
uint32 advertisedWindow = segment.AdvertisedWindow(fSendWindowShift);
size_t segmentLength = buffer->size;
// First, handle the most common case for uni-directional data transfer
@ -1938,7 +1928,7 @@ TCPEndpoint::SegmentReceived(tcp_segment_header& segment, net_buffer* buffer)
// process acknowledge action as asked for by the *Receive() method
if (segmentAction & IMMEDIATE_ACKNOWLEDGE)
SendAcknowledge(true);
_SendAcknowledge(true);
else if (segmentAction & ACKNOWLEDGE)
DelayedAcknowledge();
@ -1960,21 +1950,26 @@ TCPEndpoint::SegmentReceived(tcp_segment_header& segment, net_buffer* buffer)
// #pragma mark - send
inline uint8
TCPEndpoint::_CurrentFlags()
tcp_segment_header
TCPEndpoint::_PrepareSendSegment()
{
// we don't set FLAG_FINISH here, instead we do it
// conditionally below depending if we are sending
// the last bytes of the send queue.
uint8 flags = 0;
switch (fState) {
case CLOSED:
return TCP_FLAG_RESET | TCP_FLAG_ACKNOWLEDGE;
flags = TCP_FLAG_RESET | TCP_FLAG_ACKNOWLEDGE;
break;
case SYNCHRONIZE_SENT:
return TCP_FLAG_SYNCHRONIZE;
flags = TCP_FLAG_SYNCHRONIZE;
break;
case SYNCHRONIZE_RECEIVED:
return TCP_FLAG_SYNCHRONIZE | TCP_FLAG_ACKNOWLEDGE;
flags = TCP_FLAG_SYNCHRONIZE | TCP_FLAG_ACKNOWLEDGE;
break;
case ESTABLISHED:
case FINISH_RECEIVED:
@ -1983,11 +1978,134 @@ TCPEndpoint::_CurrentFlags()
case WAIT_FOR_FINISH_ACKNOWLEDGE:
case FINISH_SENT:
case CLOSING:
return TCP_FLAG_ACKNOWLEDGE;
flags = TCP_FLAG_ACKNOWLEDGE;
default:
return 0;
break;
}
tcp_segment_header segment(flags);
if ((fOptions & TCP_NOOPT) == 0) {
if ((fFlags & FLAG_OPTION_TIMESTAMP) != 0) {
segment.options |= TCP_HAS_TIMESTAMPS;
segment.timestamp_reply = fReceivedTimestamp;
segment.timestamp_value = tcp_now();
}
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) != 0
&& fSendNext == fInitialSendSequence) {
// add connection establishment options
segment.max_segment_size = fReceiveMaxSegmentSize;
if (fFlags & FLAG_OPTION_WINDOW_SCALE) {
segment.options |= TCP_HAS_WINDOW_SCALE;
segment.window_shift = fReceiveWindowShift;
}
if ((fFlags & FLAG_OPTION_SACK_PERMITTED) != 0)
segment.options |= TCP_SACK_PERMITTED;
}
if (!fReceiveQueue.IsContiguous()
&& (fFlags & FLAG_OPTION_SACK_PERMITTED) != 0) {
segment.options |= TCP_HAS_SACK;
int maxSackCount = MAX_SACK_BLKS
- ((fFlags & FLAG_OPTION_TIMESTAMP) != 0) ? 1 : 0;
memset(segment.sacks, 0, sizeof(segment.sacks));
segment.sackCount = fReceiveQueue.PopulateSackInfo(fReceiveNext,
maxSackCount, segment.sacks);
}
}
size_t availableBytes = fReceiveQueue.Free();
// window size must remain same for duplicate acknowledgements
if (!fReceiveQueue.IsContiguous())
availableBytes = (fReceiveMaxAdvertised - fReceiveNext).Number();
segment.SetAdvertisedWindow(availableBytes, fReceiveWindowShift);
segment.acknowledge = fReceiveNext.Number();
// Process urgent data
if (fSendUrgentOffset > fSendNext) {
segment.flags |= TCP_FLAG_URGENT;
segment.urgent_offset = (fSendUrgentOffset - fSendNext).Number();
} else {
fSendUrgentOffset = fSendUnacknowledged.Number();
// Keep urgent offset updated, so that it doesn't reach into our
// send window on overlap
segment.urgent_offset = 0;
}
return segment;
}
status_t
TCPEndpoint::_PrepareAndSend(tcp_segment_header& segment, net_buffer* buffer,
bool isRetransmit)
{
LocalAddress().CopyTo(buffer->source);
PeerAddress().CopyTo(buffer->destination);
uint32 size = buffer->size, segmentLength = size;
segment.sequence = fSendNext.Number();
TRACE("_PrepareAndSend(): buffer %p (%" B_PRIu32 " bytes) address %s to "
"%s flags %#" B_PRIx8 ", seq %" B_PRIu32 ", ack %" B_PRIu32
", rwnd %" B_PRIu16 ", cwnd %" B_PRIu32 ", ssthresh %" B_PRIu32
", len %" B_PRIu32 ", first %" B_PRIu32 ", last %" B_PRIu32,
buffer, buffer->size, PrintAddress(buffer->source),
PrintAddress(buffer->destination), segment.flags, segment.sequence,
segment.acknowledge, segment.advertised_window,
fCongestionWindow, fSlowStartThreshold, segmentLength,
fSendQueue.FirstSequence().Number(),
fSendQueue.LastSequence().Number());
T(Send(this, segment, buffer, fSendQueue.FirstSequence(),
fSendQueue.LastSequence()));
PROBE(buffer, sendWindow);
status_t status = add_tcp_header(AddressModule(), segment, buffer);
if (status != B_OK) {
gBufferModule->free(buffer);
return status;
}
if (segment.flags & TCP_FLAG_SYNCHRONIZE) {
segment.options &= ~TCP_HAS_WINDOW_SCALE;
segment.max_segment_size = 0;
size++;
}
if (segment.flags & TCP_FLAG_FINISH)
size++;
status = next->module->send_routed_data(next, fRoute, buffer);
if (status < B_OK) {
gBufferModule->free(buffer);
return status;
}
fSendNext += size;
if (fSendMax < fSendNext)
fSendMax = fSendNext;
fReceiveMaxAdvertised = fReceiveNext + segment.AdvertisedWindow(fReceiveWindowShift);
if (segmentLength != 0 && fState == ESTABLISHED)
--fSendMaxSegments;
if (fSendTime == 0 && !isRetransmit
&& (segmentLength != 0 || (segment.flags & TCP_FLAG_SYNCHRONIZE) != 0)) {
fSendTime = tcp_now();
fRoundTripStartSequence = segment.sequence;
}
if (segment.flags & TCP_FLAG_ACKNOWLEDGE) {
fLastAcknowledgeSent = segment.acknowledge;
gStackModule->cancel_timer(&fDelayedAcknowledgeTimer);
}
return B_OK;
}
@ -2015,7 +2133,7 @@ TCPEndpoint::_ShouldSendSegment(tcp_segment_header& segment, uint32 length,
// check if we need to send a window update to the peer
if (segment.advertised_window > 0) {
// correct the window to take into account what already has been advertised
uint32 window = (segment.advertised_window << fReceiveWindowShift)
uint32 window = segment.AdvertisedWindow(fReceiveWindowShift)
- (fReceiveMaxAdvertised - fReceiveNext).Number();
// if we can advertise a window larger than twice the maximum segment
@ -2039,81 +2157,38 @@ TCPEndpoint::_ShouldSendSegment(tcp_segment_header& segment, uint32 length,
status_t
TCPEndpoint::_SendQueued(bool force)
TCPEndpoint::_SendAcknowledge(bool force)
{
return _SendQueued(force, fSendWindow);
if (fRoute == NULL || fState == LISTEN)
return B_ERROR;
tcp_segment_header segment = _PrepareSendSegment();
// Is there actually anything to do?
if (!force && fState == ESTABLISHED
&& fLastAcknowledgeSent == fReceiveNext
&& fReceiveQueue.IsContiguous()
&& !_ShouldSendSegment(segment, 0, 0, 0))
return B_OK;
net_buffer* buffer = gBufferModule->create(256);
if (buffer == NULL)
return B_NO_MEMORY;
return _PrepareAndSend(segment, buffer, false);
}
/*! Sends one or more TCP segments with the data waiting in the queue, or some
specific flags that need to be sent.
*/
/*! Sends one or more TCP segments with the data waiting in the queue. */
status_t
TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
TCPEndpoint::_SendQueued(bool force)
{
if (fRoute == NULL)
if (fRoute == NULL || fState < ESTABLISHED)
return B_ERROR;
// in passive state?
if (fState == LISTEN)
return B_ERROR;
tcp_segment_header segment(_CurrentFlags());
if ((fOptions & TCP_NOOPT) == 0) {
if ((fFlags & FLAG_OPTION_TIMESTAMP) != 0) {
segment.options |= TCP_HAS_TIMESTAMPS;
segment.timestamp_reply = fReceivedTimestamp;
segment.timestamp_value = tcp_now();
}
// SACK information is embedded with duplicate acknowledgements
if (!fReceiveQueue.IsContiguous()
&& fLastAcknowledgeSent <= fReceiveNext
&& (fFlags & FLAG_OPTION_SACK_PERMITTED) != 0) {
segment.options |= TCP_HAS_SACK;
int maxSackCount = MAX_SACK_BLKS
- ((fFlags & FLAG_OPTION_TIMESTAMP) != 0);
memset(segment.sacks, 0, sizeof(segment.sacks));
segment.sackCount = fReceiveQueue.PopulateSackInfo(fReceiveNext,
maxSackCount, segment.sacks);
}
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) != 0
&& fSendNext == fInitialSendSequence) {
// add connection establishment options
segment.max_segment_size = fReceiveMaxSegmentSize;
if (fFlags & FLAG_OPTION_WINDOW_SCALE) {
segment.options |= TCP_HAS_WINDOW_SCALE;
segment.window_shift = fReceiveWindowShift;
}
if ((fFlags & FLAG_OPTION_SACK_PERMITTED) != 0)
segment.options |= TCP_SACK_PERMITTED;
}
}
size_t availableBytes = fReceiveQueue.Free();
// window size must remain same for duplicate acknowledgements
if (!fReceiveQueue.IsContiguous())
availableBytes = (fReceiveMaxAdvertised - fReceiveNext).Number();
if (fFlags & FLAG_OPTION_WINDOW_SCALE)
availableBytes >>= fReceiveWindowShift;
segment.advertised_window = min_c(TCP_MAX_WINDOW, availableBytes);
segment.acknowledge = fReceiveNext.Number();
// Process urgent data
if (fSendUrgentOffset > fSendNext) {
segment.flags |= TCP_FLAG_URGENT;
segment.urgent_offset = (fSendUrgentOffset - fSendNext).Number();
} else {
fSendUrgentOffset = fSendUnacknowledged.Number();
// Keep urgent offset updated, so that it doesn't reach into our
// send window on overlap
segment.urgent_offset = 0;
}
tcp_segment_header segment = _PrepareSendSegment();
uint32 sendWindow = fSendWindow;
if (fCongestionWindow > 0 && fCongestionWindow < sendWindow)
sendWindow = fCongestionWindow;
@ -2141,6 +2216,11 @@ TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
sendWindow -= consumedWindow;
uint32 length = min_c(fSendQueue.Available(fSendNext), sendWindow);
if (length == 0) {
// Nothing to send.
return B_OK;
}
bool shouldStartRetransmitTimer = fSendNext == fSendUnacknowledged;
bool retransmit = fSendNext < fSendMax;
@ -2154,7 +2234,7 @@ TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
- tcp_options_length(segment);
uint32 segmentLength = min_c(length, segmentMaxSize);
if (fSendNext + segmentLength == fSendQueue.LastSequence() && !force) {
if ((fSendNext + segmentLength) == fSendQueue.LastSequence() && !force) {
if (state_needs_finish(fState))
segment.flags |= TCP_FLAG_FINISH;
if (length > 0)
@ -2183,74 +2263,13 @@ TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
return status;
}
LocalAddress().CopyTo(buffer->source);
PeerAddress().CopyTo(buffer->destination);
uint32 size = buffer->size;
segment.sequence = fSendNext.Number();
TRACE("SendQueued(): buffer %p (%" B_PRIu32 " bytes) address %s to "
"%s flags %#" B_PRIx8 ", seq %" B_PRIu32 ", ack %" B_PRIu32
", rwnd %" B_PRIu16 ", cwnd %" B_PRIu32 ", ssthresh %" B_PRIu32
", len %" B_PRIu32 ", first %" B_PRIu32 ", last %" B_PRIu32,
buffer, buffer->size, PrintAddress(buffer->source),
PrintAddress(buffer->destination), segment.flags, segment.sequence,
segment.acknowledge, segment.advertised_window,
fCongestionWindow, fSlowStartThreshold, segmentLength,
fSendQueue.FirstSequence().Number(),
fSendQueue.LastSequence().Number());
T(Send(this, segment, buffer, fSendQueue.FirstSequence(),
fSendQueue.LastSequence()));
PROBE(buffer, sendWindow);
sendWindow -= buffer->size;
status = add_tcp_header(AddressModule(), segment, buffer);
if (status != B_OK) {
gBufferModule->free(buffer);
status = _PrepareAndSend(segment, buffer, retransmit);
if (status != B_OK)
return status;
}
// Update send status - we need to do this before we send the data
// for local connections as the answer is directly handled
if (segment.flags & TCP_FLAG_SYNCHRONIZE) {
segment.options &= ~TCP_HAS_WINDOW_SCALE;
segment.max_segment_size = 0;
size++;
}
if (segment.flags & TCP_FLAG_FINISH)
size++;
uint32 sendMax = fSendMax.Number();
fSendNext += size;
if (fSendMax < fSendNext)
fSendMax = fSendNext;
fReceiveMaxAdvertised = fReceiveNext
+ ((uint32)segment.advertised_window << fReceiveWindowShift);
if (segmentLength != 0 && fState == ESTABLISHED)
--fSendMaxSegments;
status = next->module->send_routed_data(next, fRoute, buffer);
if (status < B_OK) {
gBufferModule->free(buffer);
fSendNext = segment.sequence;
fSendMax = sendMax;
// restore send status
return status;
}
if (fSendTime == 0 && !retransmit
&& (segmentLength != 0 || (segment.flags & TCP_FLAG_SYNCHRONIZE) !=0)) {
fSendTime = tcp_now();
fRoundTripStartSequence = segment.sequence;
}
if (shouldStartRetransmitTimer && size > 0) {
if (shouldStartRetransmitTimer) {
TRACE("starting initial retransmit timer of: %" B_PRIdBIGTIME,
fRetransmitTimeout);
gStackModule->set_timer(&fRetransmitTimer, fRetransmitTimeout);
@ -2258,11 +2277,6 @@ TCPEndpoint::_SendQueued(bool force, uint32 sendWindow)
shouldStartRetransmitTimer = false;
}
if (segment.flags & TCP_FLAG_ACKNOWLEDGE) {
fLastAcknowledgeSent = segment.acknowledge;
gStackModule->cancel_timer(&fDelayedAcknowledgeTimer);
}
length -= segmentLength;
segment.flags &= ~(TCP_FLAG_SYNCHRONIZE | TCP_FLAG_RESET
| TCP_FLAG_FINISH);
@ -2507,8 +2521,6 @@ TCPEndpoint::_PersistTimer(net_timer* timer, void* _endpoint)
// the timer might not have been canceled early enough
if (endpoint->State() == CLOSED)
return;
if (endpoint->fSendQueue.Available(endpoint->fSendNext) == 0)
return;
endpoint->_SendQueued(true);
}
@ -2527,10 +2539,8 @@ TCPEndpoint::_DelayedAcknowledgeTimer(net_timer* timer, void* _endpoint)
// the timer might not have been canceled early enough
if (endpoint->State() == CLOSED)
return;
if (endpoint->fLastAcknowledgeSent == endpoint->fReceiveNext)
return;
endpoint->SendAcknowledge(true);
endpoint->_SendAcknowledge();
}

View File

@ -61,7 +61,6 @@ public:
bool IsLocal() const;
status_t DelayedAcknowledge();
status_t SendAcknowledge(bool force);
int32 SegmentReceived(tcp_segment_header& segment,
net_buffer* buffer);
@ -74,13 +73,16 @@ private:
void _UpdateTimeWait();
void _Close();
void _CancelConnectionTimers();
uint8 _CurrentFlags();
tcp_segment_header _PrepareSendSegment();
bool _ShouldSendSegment(tcp_segment_header& segment,
uint32 length, uint32 segmentMaxSize,
uint32 flightSize);
status_t _PrepareAndSend(tcp_segment_header& segment, net_buffer* buffer,
bool isRetransmit);
status_t _SendAcknowledge(bool force = false);
status_t _SendQueued(bool force = false);
status_t _SendQueued(bool force, uint32 sendWindow);
int _MaxSegmentSize(const struct sockaddr* address) const;
status_t _Disconnect(bool closing);
ssize_t _AvailableData() const;
void _NotifyReader();
@ -103,6 +105,7 @@ private:
bigtime_t timeout);
bool _AddData(tcp_segment_header& segment,
net_buffer* buffer);
int _MaxSegmentSize(const struct sockaddr* address) const;
void _PrepareReceivePath(tcp_segment_header& segment);
status_t _PrepareSendPath(const sockaddr* peer);
void _Acknowledged(tcp_segment_header& segment);

View File

@ -266,6 +266,16 @@ struct tcp_segment_header {
return (flags & (TCP_FLAG_SYNCHRONIZE | TCP_FLAG_FINISH | TCP_FLAG_RESET
| TCP_FLAG_URGENT | TCP_FLAG_ACKNOWLEDGE)) == TCP_FLAG_ACKNOWLEDGE;
}
uint32 AdvertisedWindow(uint8 windowShift) const
{
return (uint32)advertised_window << windowShift;
}
void SetAdvertisedWindow(size_t availableBytes, uint8 windowShift)
{
availableBytes >>= windowShift;
advertised_window = min_c(TCP_MAX_WINDOW, availableBytes);
}
};
enum tcp_segment_action {