TCP: initial shutdown() implementation and some general fixes

* set FLAG_NO_RECEIVE/FLAG_NO_SEND on shutdown() and send FIN on SHUT_WR
 * if a send() is attempted with FLAG_NO_SEND set return EPIPE
 * proper handling of recv timeout in ReadData(), using absolute timeout instead of relative
 * if FLAG_NO_RECEIVE is set, don't attached more segments to the receive queue, drop them instead


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@20542 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Hugo Santos 2007-04-03 18:38:42 +00:00
parent 15fda8f547
commit 52e75b6236
2 changed files with 77 additions and 37 deletions

View File

@ -59,6 +59,7 @@ enum {
FLAG_OPTION_WINDOW_SHIFT = 0x01,
FLAG_OPTION_TIMESTAMP = 0x02,
FLAG_NO_RECEIVE = 0x04,
FLAG_NO_SEND = 0x08,
};
@ -162,20 +163,9 @@ TCPEndpoint::Close()
return B_OK;
}
tcp_state previousState = fState;
if (fState == SYNCHRONIZE_RECEIVED || fState == ESTABLISHED)
fState = FINISH_SENT;
else if (fState == FINISH_RECEIVED)
fState = WAIT_FOR_FINISH_ACKNOWLEDGE;
else
fState = CLOSED;
status_t status = _SendQueued();
if (status != B_OK) {
fState = previousState;
status_t status = _ShutdownEgress(true);
if (status != B_OK)
return status;
}
TRACE("Close(): Entering state %d", fState);
@ -381,9 +371,18 @@ TCPEndpoint::Listen(int count)
status_t
TCPEndpoint::Shutdown(int direction)
{
TRACE("Shutdown()");
// TODO: implement shutdown!
return B_ERROR;
TRACE("Shutdown(%i)", direction);
RecursiveLocker lock(fLock);
if (direction == SHUT_RD || direction == SHUT_RDWR) {
fFlags |= FLAG_NO_RECEIVE;
}
if (direction == SHUT_WR || direction == SHUT_RDWR)
_ShutdownEgress(false);
return B_OK;
}
@ -396,6 +395,13 @@ TCPEndpoint::SendData(net_buffer *buffer)
TRACE("SendData(buffer %p, size %lu, flags %lx)",
buffer, buffer->size, buffer->flags);
RecursiveLocker lock(fLock);
if (fFlags & FLAG_NO_SEND) {
// TODO: send SIGPIPE signal to app?
return EPIPE;
}
size_t bytesLeft = buffer->size;
do {
@ -413,17 +419,15 @@ TCPEndpoint::SendData(net_buffer *buffer)
} else
chunk = buffer;
recursive_lock_lock(&fLock);
while (fSendQueue.Free() < chunk->size) {
recursive_lock_unlock(&fLock);
lock.Unlock();
status_t status = acquire_sem_etc(fSendLock, 1,
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, socket->send.timeout);
if (status < B_OK)
return status;
recursive_lock_lock(&fLock);
lock.Lock();
}
// TODO: check state!
@ -438,8 +442,6 @@ TCPEndpoint::SendData(net_buffer *buffer)
status_t status = _SendQueued();
recursive_lock_unlock(&fLock);
if (buffer != chunk) {
// as long as we didn't eat the buffer, we can still return an error code
// (we don't own the buffer if we return an error code)
@ -491,17 +493,23 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
// read data out of buffer
// TODO: add support for urgent data (MSG_OOB)
// TODO: wait until enough bytes are available
do {
// TODO: we may wait much longer than the time we wanted to...
status_t status = acquire_sem_etc(fReceiveLock, 1,
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, socket->receive.timeout);
if (status < B_OK)
return status;
} while (fReceiveQueue.Available() < socket->receive.low_water_mark
&& (fFlags & FLAG_NO_RECEIVE) == 0);
bigtime_t timeout = system_time() + socket->receive.timeout;
RecursiveLocker locker(fLock);
do {
locker.Unlock();
status_t status = acquire_sem_etc(fReceiveLock, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
if (status < B_OK)
return status;
locker.Lock();
} while (fReceiveQueue.Available() < socket->receive.low_water_mark
&& (fFlags & FLAG_NO_RECEIVE) == 0);
TRACE("ReadData(): read %lu bytes, %lu are available",
numBytes, fReceiveQueue.Available());
@ -777,15 +785,20 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
TRACE("Receive(): header prediction receive!");
// we're on the receiving end of the connection, and this segment
// is the one we were expecting, in-sequence
fReceiveNext += buffer->size;
TRACE("Receive(): receive next = %lu", (uint32)fReceiveNext);
fReceiveQueue.Add(buffer, segment.sequence);
if (fFlags & FLAG_NO_RECEIVE) {
return DROP;
} else {
fReceiveNext += buffer->size;
TRACE("Receive(): receive next = %lu", (uint32)fReceiveNext);
fReceiveQueue.Add(buffer, segment.sequence);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ,
fReceiveQueue.Available());
return KEEP | ACKNOWLEDGE;
return KEEP | ACKNOWLEDGE;
}
}
}
@ -1256,6 +1269,32 @@ TCPEndpoint::_AvailableBytesOrDisconnect() const
}
status_t
TCPEndpoint::_ShutdownEgress(bool closing)
{
tcp_state previousState = fState;
if (fState == SYNCHRONIZE_RECEIVED || fState == ESTABLISHED)
fState = FINISH_SENT;
else if (fState == FINISH_RECEIVED)
fState = WAIT_FOR_FINISH_ACKNOWLEDGE;
else if (closing)
fState = CLOSED;
else
return B_OK;
status_t status = _SendQueued();
if (status != B_OK) {
fState = previousState;
return status;
}
fFlags |= FLAG_NO_SEND;
return B_OK;
}
// #pragma mark - timer

View File

@ -67,6 +67,7 @@ class TCPEndpoint : public net_protocol {
status_t _SendQueued(bool force = false);
int _GetMSS(const struct sockaddr *) const;
ssize_t _AvailableBytesOrDisconnect() const;
status_t _ShutdownEgress(bool closing);
static void _TimeWaitTimer(net_timer *timer, void *data);
static void _RetransmitTimer(net_timer *timer, void *data);