* Header prediction was a bit too generous (and thus used too often)

* SendData() could return an error and still wanted to own the buffer passed in
* Removed the timeout computations from SendData() - looks like it's usually
  done the simple way when the data has to be submitted in smaller packets.
* ReadData() no longer blocks in case the peer has closed the connection.
* More debug output.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@19430 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2006-12-05 02:26:13 +00:00
parent d344e18b94
commit 40eb4ed30d

View File

@ -55,6 +55,7 @@
enum {
FLAG_OPTION_WINDOW_SHIFT = 0x01,
FLAG_OPTION_TIMESTAMP = 0x02,
FLAG_NO_RECEIVE = 0x04,
};
@ -367,10 +368,9 @@ TCPEndpoint::Shutdown(int direction)
status_t
TCPEndpoint::SendData(net_buffer *buffer)
{
TRACE(("TCP:%p.SendData()\n", this));
TRACE(("TCP:%p.SendData(buffer %p, size %lu)\n", this, buffer, buffer->size));
size_t bytesLeft = buffer->size;
bigtime_t timeout = socket->send.timeout;
do {
net_buffer *chunk;
@ -380,6 +380,7 @@ TCPEndpoint::SendData(net_buffer *buffer)
* fSendMaxSegmentSize;
chunk = gBufferModule->split(buffer, chunkSize);
dprintf(" TCP::Send() split buffer at %lu (buffer size %lu, mss %lu) -> %p\n", chunkSize, socket->send.buffer_size, fSendMaxSegmentSize, chunk);
if (chunk == NULL)
return B_NO_MEMORY;
} else
@ -389,15 +390,12 @@ TCPEndpoint::SendData(net_buffer *buffer)
while (fSendQueue.Free() < chunk->size) {
recursive_lock_unlock(&fLock);
bigtime_t now = system_time();
status_t status = acquire_sem_etc(fSendLock, 1,
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, timeout);
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, socket->send.timeout);
if (status < B_OK)
return status;
if (timeout > 0)
timeout -= system_time() - now;
recursive_lock_lock(&fLock);
}
@ -408,17 +406,21 @@ TCPEndpoint::SendData(net_buffer *buffer)
return B_OK;
}
size_t chunkSize = chunk->size;
fSendQueue.Add(chunk);
status_t status = _SendQueued();
recursive_lock_unlock(&fLock);
if (status < B_OK)
return status;
if (timeout < 0)
return B_WOULD_BLOCK;
if (buffer != chunk) {
// as long as we didn't eat the buffer, we can still return an error code
// (we don't own the buffer if we return an error code)
if (status < B_OK)
return status;
}
bytesLeft -= chunk->size;
bytesLeft -= chunkSize;
} while (bytesLeft > 0);
return B_OK;
@ -438,7 +440,7 @@ TCPEndpoint::SendAvailable()
status_t
TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
{
TRACE(("TCP:%p.ReadData()\n", this));
TRACE(("TCP:%p.ReadData(%lu bytes)\n", this, numBytes));
//BenaphoreLocker lock(&fReceiveLock);
@ -453,6 +455,12 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
return status;
}
if ((fFlags & FLAG_NO_RECEIVE) != 0 && fReceiveQueue.Available() == 0) {
// there is no data left in the queue, and we can't receive anything, anymore
*_buffer = NULL;
return B_OK;
}
// read data out of buffer
// TODO: add support for urgent data (MSG_OOB)
// TODO: wait until enough bytes are available
@ -462,9 +470,15 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, socket->receive.timeout);
if (status < B_OK)
return status;
} while (fReceiveQueue.Available() < socket->receive.low_water_mark);
} while (fReceiveQueue.Available() < socket->receive.low_water_mark
&& (fFlags & FLAG_NO_RECEIVE) == 0);
RecursiveLocker locker(fLock);
dprintf("read %lu bytes, %lu are available\n", numBytes, fReceiveQueue.Available());
if (numBytes < fReceiveQueue.Available())
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
return fReceiveQueue.Get(numBytes, (flags & MSG_PEEK) == 0, _buffer);
}
@ -688,8 +702,8 @@ TCPEndpoint::SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *buf
int32
TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
{
TRACE(("TCP:%p.ReceiveData(): Connection in state %d received packet %p with flags 0x%x, seq %lu, ack %lu!\n",
this, fState, buffer, segment.flags, segment.sequence, segment.acknowledge));
TRACE(("TCP:%p.ReceiveData(): Connection in state %d received packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!\n",
this, fState, buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge));
// TODO: rethink locking!
@ -701,12 +715,14 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (fState == ESTABLISHED
&& segment.AcknowledgeOnly()
&& fReceiveNext == segment.sequence
&& advertisedWindow > 0 && advertisedWindow == fSendWindow
&& fSendNext == fSendMax) {
if (buffer->size == 0) {
// this is a pure acknowledge segment - we're on the sending end
if (fSendUnacknowledged < segment.acknowledge
&& fSendMax >= segment.acknowledge) {
dprintf("header prediction send!\n");
// and it only acknowledges outstanding data
// TODO: update RTT estimators
@ -729,9 +745,11 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
} else if (segment.acknowledge == fSendUnacknowledged
&& fReceiveQueue.IsContiguous()
&& fReceiveQueue.Free() >= buffer->size) {
dprintf("header prediction receive!\n");
// we're on the receiving end of the connection, and this segment
// is the one we were expecting, in-sequence
fReceiveNext += buffer->size;
dprintf("receive next = %lu!\n", (uint32)fReceiveNext);
fReceiveQueue.Add(buffer, segment.sequence);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
@ -787,6 +805,7 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
}
// remove duplicate data at the start
dprintf("* remove %ld bytes from the start\n", drop);
gBufferModule->remove_header(buffer, drop);
segment.sequence += drop;
}
@ -811,6 +830,7 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
}
segment.flags &= ~(TCP_FLAG_FINISH | TCP_FLAG_PUSH);
dprintf("* remove %ld bytes from the end\n", drop);
gBufferModule->remove_trailer(buffer, drop);
}
@ -837,17 +857,29 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (fSendUnacknowledged >= segment.acknowledge) {
// this is a duplicate acknowledge
// TODO: handle this!
fDuplicateAcknowledgeCount++;
if (buffer->size == 0 && advertisedWindow == fSendWindow
&& (segment.flags & TCP_FLAG_FINISH) == 0) {
dprintf("duplicate ack!\n");
fDuplicateAcknowledgeCount++;
gStackModule->cancel_timer(&fRetransmitTimer);
fSendNext = segment.acknowledge;
_SendQueued();
return DROP;
}
} else {
// this segment acknowleges in flight data
// this segment acknowledges in flight data
fDuplicateAcknowledgeCount = 0;
if (fSendMax == segment.acknowledge) {
// there is no outstanding data to be acknowledged
// TODO: if the transmit timer function is already waiting
// to acquire this endpoint's lock, we should stop it anyway
dprintf("all inflight data ack'd!\n");
gStackModule->cancel_timer(&fRetransmitTimer);
} else {
dprintf("set retransmit timer!\n");
// TODO: set retransmit timer correctly
if (!gStackModule->is_timer_active(&fRetransmitTimer))
gStackModule->set_timer(&fRetransmitTimer, 1000000LL);
@ -861,6 +893,7 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (segment.acknowledge > fSendQueue.LastSequence()) {
// our TCP_FLAG_FINISH has been acknowledged
dprintf("FIN has been acknowledged!\n");
switch (fState) {
case FINISH_SENT:
@ -892,6 +925,11 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (segment.flags & TCP_FLAG_FINISH) {
dprintf("peer is finishing connection!");
fReceiveNext++;
fFlags |= FLAG_NO_RECEIVE;
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
// other side is closing connection; change states
switch (fState) {
@ -928,6 +966,7 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (buffer->size > 0) {
if (fReceiveNext == segment.sequence)
fReceiveNext += buffer->size;
dprintf("adding data, receive next = %lu!\n", (uint32)fReceiveNext);
fReceiveQueue.Add(buffer, segment.sequence);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
@ -1091,7 +1130,7 @@ TCPEndpoint::_SendQueued(bool force)
gAddressModule->set_to((sockaddr *)&buffer->source, (sockaddr *)&socket->address);
gAddressModule->set_to((sockaddr *)&buffer->destination, (sockaddr *)&socket->peer);
TRACE(("TCP:%p.SendQueued() flags %u, buffer %p, size %lu, from address %s to %s\n", this,
TRACE(("TCP:%p.SendQueued() flags %x, buffer %p, size %lu, from address %s to %s\n", this,
segment.flags, buffer, buffer->size,
AddressString(gDomain, (sockaddr *)&buffer->source, true).Data(),
AddressString(gDomain, (sockaddr *)&buffer->destination, true).Data()));