keep PUSH'ed pointer in receiver side of TCP so we can wait for more data in recv()
git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@20569 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
parent
8338b05a30
commit
fcd6d8cd92
|
@ -26,7 +26,8 @@ BufferQueue::BufferQueue(size_t maxBytes)
|
|||
fNumBytes(0),
|
||||
fContiguousBytes(0),
|
||||
fFirstSequence(0),
|
||||
fLastSequence(0)
|
||||
fLastSequence(0),
|
||||
fPushPointer(0)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -352,3 +353,9 @@ BufferQueue::Available(tcp_sequence sequence) const
|
|||
|
||||
return fContiguousBytes + fFirstSequence - sequence;
|
||||
}
|
||||
|
||||
void
|
||||
BufferQueue::SetPushPointer(tcp_sequence sequence)
|
||||
{
|
||||
fPushPointer = sequence;
|
||||
}
|
||||
|
|
|
@ -33,6 +33,9 @@ class BufferQueue {
|
|||
size_t Available() const { return fContiguousBytes; }
|
||||
size_t Available(tcp_sequence sequence) const;
|
||||
|
||||
size_t PushedData() const { return fPushPointer > fFirstSequence ? fPushPointer - fFirstSequence : 0; }
|
||||
void SetPushPointer(tcp_sequence sequence);
|
||||
|
||||
size_t Used() const { return fNumBytes; }
|
||||
size_t Free() const { return fMaxBytes - fNumBytes; }
|
||||
|
||||
|
@ -49,6 +52,7 @@ class BufferQueue {
|
|||
size_t fContiguousBytes;
|
||||
tcp_sequence fFirstSequence;
|
||||
tcp_sequence fLastSequence;
|
||||
tcp_sequence fPushPointer;
|
||||
};
|
||||
|
||||
#endif // BUFFER_QUEUE_H
|
||||
|
|
|
@ -496,21 +496,27 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
|
|||
|
||||
// read data out of buffer
|
||||
// TODO: add support for urgent data (MSG_OOB)
|
||||
// TODO: wait until enough bytes are available
|
||||
|
||||
bigtime_t timeout = system_time() + socket->receive.timeout;
|
||||
|
||||
do {
|
||||
while (fReceiveQueue.PushedData() == 0
|
||||
&& fReceiveQueue.Available() < numBytes
|
||||
&& (fFlags & FLAG_NO_RECEIVE) == 0) {
|
||||
locker.Unlock();
|
||||
|
||||
status_t status = acquire_sem_etc(fReceiveLock, 1,
|
||||
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
|
||||
if (status < B_OK)
|
||||
return status;
|
||||
|
||||
locker.Lock();
|
||||
} while (fReceiveQueue.Available() < socket->receive.low_water_mark
|
||||
&& (fFlags & FLAG_NO_RECEIVE) == 0);
|
||||
|
||||
if (status < B_OK) {
|
||||
// TODO: If we are timing out, should we push the
|
||||
// available data?
|
||||
if (status == B_TIMED_OUT && fReceiveQueue.Available() > 0)
|
||||
break;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
TRACE("ReadData(): read %lu bytes, %lu are available",
|
||||
numBytes, fReceiveQueue.Available());
|
||||
|
@ -793,6 +799,8 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
|
|||
fReceiveNext += buffer->size;
|
||||
TRACE("Receive(): receive next = %lu", (uint32)fReceiveNext);
|
||||
fReceiveQueue.Add(buffer, segment.sequence);
|
||||
if (segment.flags & TCP_FLAG_PUSH)
|
||||
fReceiveQueue.SetPushPointer(fReceiveNext);
|
||||
|
||||
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
|
||||
// TODO: real conditional locking needed!
|
||||
|
@ -1018,6 +1026,9 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
|
|||
} else
|
||||
gBufferModule->free(buffer);
|
||||
|
||||
if (segment.flags & TCP_FLAG_PUSH)
|
||||
fReceiveQueue.SetPushPointer(fReceiveNext);
|
||||
|
||||
return action;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue