From 03298f9d20fe782497471b9e3e131ee00d3fa526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Axel=20D=C3=B6rfler?= Date: Fri, 2 May 2008 17:36:28 +0000 Subject: [PATCH] * The WaitList now always notifies all waiters. * In SendData(), TCP will now split the buffer into smaller parts if it can send data (ie. there is free space in the buffer queue left, but not enough to send the whole buffer, and the free space is more than the send low water mark of the socket). * Both of these changes together let TCP now pass the "forwarding" test of the OpenSSH suite. git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@25294 a95241bf-73f2-0310-859d-f6bbb57e9c96 --- .../network/protocols/tcp/TCPEndpoint.cpp | 77 ++++++++++++++----- .../network/protocols/tcp/TCPEndpoint.h | 2 +- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp index 5e96e280f1..cfcb9a0d61 100644 --- a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp +++ b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.cpp @@ -329,21 +329,20 @@ WaitList::InitCheck() const status_t -WaitList::Wait(MutexLocker& locker, bigtime_t timeout, bool wakeNext) +WaitList::Wait(MutexLocker& locker, bigtime_t timeout) { locker.Unlock(); status_t status = B_OK; - while (status == B_OK && !atomic_test_and_set(&fCondition, 0, 1)) { + while (!atomic_test_and_set(&fCondition, 0, 1)) { status = acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); + if (status != B_OK) + break; } locker.Lock(); - if (status == B_OK && wakeNext) - Signal(); - return status; } @@ -353,11 +352,11 @@ WaitList::Signal() { atomic_or(&fCondition, 1); #ifdef __HAIKU__ - release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE | B_RELEASE_IF_WAITING_ONLY); + release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE | B_RELEASE_ALL); #else int32 count; if (get_sem_count(fSem, &count) == B_OK && count < 0) - release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE); + release_sem_etc(fSem, -count, B_DO_NOT_RESCHEDULE); #endif } @@ -423,6 +422,9 @@ TCPEndpoint::~TCPEndpoint() } mutex_destroy(&fLock); + + // TODO: we need to wait for all timers to return + //_WaitForTimers(); } @@ -730,17 +732,18 @@ TCPEndpoint::SendData(net_buffer *buffer) return EPIPE; } - if (buffer->size > 0) { - if (buffer->size > fSendQueue.Size()) - return ENOBUFS; + uint32 flags = buffer->flags; + size_t left = buffer->size; - bigtime_t timeout = absolute_timeout(socket->send.timeout); - if (gStackModule->is_restarted_syscall()) - timeout = gStackModule->restore_syscall_restart_timeout(); - else - gStackModule->store_syscall_restart_timeout(timeout); + bigtime_t timeout = absolute_timeout(socket->send.timeout); + if (gStackModule->is_restarted_syscall()) + timeout = gStackModule->restore_syscall_restart_timeout(); + else + gStackModule->store_syscall_restart_timeout(timeout); - while (fSendQueue.Free() < buffer->size) { + while (left > 0) { + while (fSendQueue.Free() < socket->send.low_water_mark) { + // wait until enough space is available status_t status = fSendList.Wait(lock, timeout); if (status < B_OK) { TRACE(" SendData() returning %s (%d)", @@ -749,19 +752,42 @@ TCPEndpoint::SendData(net_buffer *buffer) } } - fSendQueue.Add(buffer); + size_t size = fSendQueue.Free(); + if (size < left) { + // we need to split the original buffer + net_buffer* clone = gBufferModule->clone(buffer, false); + // TODO: add offset/size parameter to net_buffer::clone() or + // even a move_data() function, as this is a bit inefficient + if (clone == NULL) + return ENOBUFS; + + status_t status = gBufferModule->trim(clone, size); + if (status != B_OK) { + gBufferModule->free(clone); + return status; + } + + gBufferModule->remove_header(buffer, size); + left -= size; + fSendQueue.Add(clone); + } else { + left -= buffer->size; + fSendQueue.Add(buffer); + } } TRACE(" SendData(): %lu bytes used.", fSendQueue.Used()); bool force = false; - if ((buffer->flags & MSG_OOB) != 0) { + if ((flags & MSG_OOB) != 0) { fSendUrgentOffset = fSendQueue.LastSequence(); // RFC 961 specifies that the urgent offset points to the last // byte of urgent data. However, this is commonly implemented as // here, ie. it points to the first byte after the urgent data. force = true; } + if ((flags & MSG_EOF) != 0) + _Disconnect(false); if (fState == ESTABLISHED || fState == FINISH_RECEIVED) _SendQueued(force); @@ -857,7 +883,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer) if ((flags & MSG_DONTWAIT) != 0 || socket->receive.timeout == 0) return B_WOULD_BLOCK; - status_t status = fReceiveList.Wait(locker, timeout, false); + status_t status = fReceiveList.Wait(locker, timeout); if (status < B_OK) { // The Open Group base specification mentions that EINTR should be // returned if the recv() is interrupted before _any data_ is @@ -876,7 +902,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer) if (numBytes < fReceiveQueue.Available()) fReceiveList.Signal(); - bool clone = (flags & MSG_PEEK); + bool clone = (flags & MSG_PEEK) != 0; ssize_t receivedBytes = fReceiveQueue.Get(numBytes, !clone, _buffer); @@ -2159,6 +2185,17 @@ TCPEndpoint::_TimeWaitTimer(net_timer* timer, void* _endpoint) TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint; T(Timer(endpoint, "time-wait")); + MutexLocker locker(endpoint->fLock); + if (!locker.IsLocked()) + return; + + if ((endpoint->fFlags & FLAG_CLOSED) == 0) { + endpoint->fFlags |= FLAG_DELETE_ON_CLOSE; + return; + } + + locker.Unlock(); + gSocketModule->delete_socket(endpoint->socket); } diff --git a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h index 8ce4752f0b..11dba9161c 100644 --- a/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h +++ b/src/add-ons/kernel/network/protocols/tcp/TCPEndpoint.h @@ -32,7 +32,7 @@ public: status_t InitCheck() const; - status_t Wait(MutexLocker &, bigtime_t timeout, bool wakeNext = true); + status_t Wait(MutexLocker &, bigtime_t timeout); void Signal(); private: