* The WaitList now always notifies all waiters.

* In SendData(), TCP will now split the buffer into smaller parts if it
  can send data (ie. there is free space in the buffer queue left, but
  not enough to send the whole buffer, and the free space is more than
  the send low water mark of the socket).
* Both of these changes together let TCP now pass the "forwarding" test
  of the OpenSSH suite.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@25294 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2008-05-02 17:36:28 +00:00
parent c1b9831bbe
commit 03298f9d20
2 changed files with 58 additions and 21 deletions

View File

@ -329,21 +329,20 @@ WaitList::InitCheck() const
status_t status_t
WaitList::Wait(MutexLocker& locker, bigtime_t timeout, bool wakeNext) WaitList::Wait(MutexLocker& locker, bigtime_t timeout)
{ {
locker.Unlock(); locker.Unlock();
status_t status = B_OK; status_t status = B_OK;
while (status == B_OK && !atomic_test_and_set(&fCondition, 0, 1)) { while (!atomic_test_and_set(&fCondition, 0, 1)) {
status = acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, status = acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
timeout); timeout);
if (status != B_OK)
break;
} }
locker.Lock(); locker.Lock();
if (status == B_OK && wakeNext)
Signal();
return status; return status;
} }
@ -353,11 +352,11 @@ WaitList::Signal()
{ {
atomic_or(&fCondition, 1); atomic_or(&fCondition, 1);
#ifdef __HAIKU__ #ifdef __HAIKU__
release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE | B_RELEASE_IF_WAITING_ONLY); release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE | B_RELEASE_ALL);
#else #else
int32 count; int32 count;
if (get_sem_count(fSem, &count) == B_OK && count < 0) if (get_sem_count(fSem, &count) == B_OK && count < 0)
release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE); release_sem_etc(fSem, -count, B_DO_NOT_RESCHEDULE);
#endif #endif
} }
@ -423,6 +422,9 @@ TCPEndpoint::~TCPEndpoint()
} }
mutex_destroy(&fLock); mutex_destroy(&fLock);
// TODO: we need to wait for all timers to return
//_WaitForTimers();
} }
@ -730,9 +732,8 @@ TCPEndpoint::SendData(net_buffer *buffer)
return EPIPE; return EPIPE;
} }
if (buffer->size > 0) { uint32 flags = buffer->flags;
if (buffer->size > fSendQueue.Size()) size_t left = buffer->size;
return ENOBUFS;
bigtime_t timeout = absolute_timeout(socket->send.timeout); bigtime_t timeout = absolute_timeout(socket->send.timeout);
if (gStackModule->is_restarted_syscall()) if (gStackModule->is_restarted_syscall())
@ -740,7 +741,9 @@ TCPEndpoint::SendData(net_buffer *buffer)
else else
gStackModule->store_syscall_restart_timeout(timeout); gStackModule->store_syscall_restart_timeout(timeout);
while (fSendQueue.Free() < buffer->size) { while (left > 0) {
while (fSendQueue.Free() < socket->send.low_water_mark) {
// wait until enough space is available
status_t status = fSendList.Wait(lock, timeout); status_t status = fSendList.Wait(lock, timeout);
if (status < B_OK) { if (status < B_OK) {
TRACE(" SendData() returning %s (%d)", TRACE(" SendData() returning %s (%d)",
@ -749,19 +752,42 @@ TCPEndpoint::SendData(net_buffer *buffer)
} }
} }
size_t size = fSendQueue.Free();
if (size < left) {
// we need to split the original buffer
net_buffer* clone = gBufferModule->clone(buffer, false);
// TODO: add offset/size parameter to net_buffer::clone() or
// even a move_data() function, as this is a bit inefficient
if (clone == NULL)
return ENOBUFS;
status_t status = gBufferModule->trim(clone, size);
if (status != B_OK) {
gBufferModule->free(clone);
return status;
}
gBufferModule->remove_header(buffer, size);
left -= size;
fSendQueue.Add(clone);
} else {
left -= buffer->size;
fSendQueue.Add(buffer); fSendQueue.Add(buffer);
} }
}
TRACE(" SendData(): %lu bytes used.", fSendQueue.Used()); TRACE(" SendData(): %lu bytes used.", fSendQueue.Used());
bool force = false; bool force = false;
if ((buffer->flags & MSG_OOB) != 0) { if ((flags & MSG_OOB) != 0) {
fSendUrgentOffset = fSendQueue.LastSequence(); fSendUrgentOffset = fSendQueue.LastSequence();
// RFC 961 specifies that the urgent offset points to the last // RFC 961 specifies that the urgent offset points to the last
// byte of urgent data. However, this is commonly implemented as // byte of urgent data. However, this is commonly implemented as
// here, ie. it points to the first byte after the urgent data. // here, ie. it points to the first byte after the urgent data.
force = true; force = true;
} }
if ((flags & MSG_EOF) != 0)
_Disconnect(false);
if (fState == ESTABLISHED || fState == FINISH_RECEIVED) if (fState == ESTABLISHED || fState == FINISH_RECEIVED)
_SendQueued(force); _SendQueued(force);
@ -857,7 +883,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
if ((flags & MSG_DONTWAIT) != 0 || socket->receive.timeout == 0) if ((flags & MSG_DONTWAIT) != 0 || socket->receive.timeout == 0)
return B_WOULD_BLOCK; return B_WOULD_BLOCK;
status_t status = fReceiveList.Wait(locker, timeout, false); status_t status = fReceiveList.Wait(locker, timeout);
if (status < B_OK) { if (status < B_OK) {
// The Open Group base specification mentions that EINTR should be // The Open Group base specification mentions that EINTR should be
// returned if the recv() is interrupted before _any data_ is // returned if the recv() is interrupted before _any data_ is
@ -876,7 +902,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
if (numBytes < fReceiveQueue.Available()) if (numBytes < fReceiveQueue.Available())
fReceiveList.Signal(); fReceiveList.Signal();
bool clone = (flags & MSG_PEEK); bool clone = (flags & MSG_PEEK) != 0;
ssize_t receivedBytes = fReceiveQueue.Get(numBytes, !clone, _buffer); ssize_t receivedBytes = fReceiveQueue.Get(numBytes, !clone, _buffer);
@ -2159,6 +2185,17 @@ TCPEndpoint::_TimeWaitTimer(net_timer* timer, void* _endpoint)
TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint; TCPEndpoint* endpoint = (TCPEndpoint*)_endpoint;
T(Timer(endpoint, "time-wait")); T(Timer(endpoint, "time-wait"));
MutexLocker locker(endpoint->fLock);
if (!locker.IsLocked())
return;
if ((endpoint->fFlags & FLAG_CLOSED) == 0) {
endpoint->fFlags |= FLAG_DELETE_ON_CLOSE;
return;
}
locker.Unlock();
gSocketModule->delete_socket(endpoint->socket); gSocketModule->delete_socket(endpoint->socket);
} }

View File

@ -32,7 +32,7 @@ public:
status_t InitCheck() const; status_t InitCheck() const;
status_t Wait(MutexLocker &, bigtime_t timeout, bool wakeNext = true); status_t Wait(MutexLocker &, bigtime_t timeout);
void Signal(); void Signal();
private: private: