even more TCP fixage.

- don't try to remove spawned sockets from Endpoints hashtable.
 - return B_WOULD_BLOCK (EAGAIN) when we time out in acquire_sem().
 - use B_RELEASE_IF_WAITING_ONLY in the TCP WaitList.
 - fixed a off by one issue in ReadData() which could result in more than needed iterations (and waiting).
 - implemented prepending new buffers to a net_buffer.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@20639 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Hugo Santos 2007-04-10 16:47:47 +00:00
parent 4620ac73aa
commit f73dd74eb9
4 changed files with 99 additions and 59 deletions

View File

@ -319,25 +319,27 @@ EndpointManager::Unbind(TCPEndpoint *endpoint)
RecursiveLocker locker(&fLock);
TCPEndpoint *other = _LookupEndpoint(gAddressModule->get_port(
(sockaddr *)&endpoint->socket->address));
if (other != endpoint) {
// remove endpoint from the list of endpoints with the same port
while (other != NULL && other->fEndpointNextWithSamePort != endpoint) {
other = other->fEndpointNextWithSamePort;
if (!endpoint->fSpawned) {
TCPEndpoint *other = _LookupEndpoint(gAddressModule->get_port(
(sockaddr *)&endpoint->socket->address));
if (other != endpoint) {
// remove endpoint from the list of endpoints with the same port
while (other != NULL && other->fEndpointNextWithSamePort != endpoint) {
other = other->fEndpointNextWithSamePort;
}
if (other != NULL)
other->fEndpointNextWithSamePort = endpoint->fEndpointNextWithSamePort;
else
panic("bound endpoint %p not in hash!", endpoint);
} else {
// we need to replace the first endpoint in the list
hash_remove(fEndpointHash, endpoint);
other = endpoint->fEndpointNextWithSamePort;
if (other != NULL)
hash_insert(fEndpointHash, other);
}
if (other != NULL)
other->fEndpointNextWithSamePort = endpoint->fEndpointNextWithSamePort;
else
panic("bound endpoint %p not in hash!", endpoint);
} else {
// we need to replace the first endpoint in the list
hash_remove(fEndpointHash, endpoint);
other = endpoint->fEndpointNextWithSamePort;
if (other != NULL)
hash_insert(fEndpointHash, other);
}
endpoint->fEndpointNextWithSamePort = NULL;

View File

@ -49,8 +49,9 @@
//#define TRACE_TCP
#ifdef TRACE_TCP
// the space after 'this' is important in order for this to work with cpp 2.95
# define TRACE(format, args...) dprintf("TCP:%p:" format "\n", this , ##args)
// the space before ', ##args' is important in order for this to work with cpp 2.95
# define TRACE(format, args...) dprintf("TCP [%llu] %p (%12s) " format "\n", \
system_time(), this, name_for_state(fState) , ##args)
#else
# define TRACE(args...) do { } while (0)
#endif
@ -79,8 +80,17 @@ absolute_timeout(bigtime_t timeout)
}
static inline status_t
posix_error(status_t error)
{
if (error == B_TIMED_OUT)
return B_WOULD_BLOCK;
return error;
}
WaitList::WaitList(const char *name)
: fWaiting(0)
{
fSem = create_sem(0, name);
}
@ -100,15 +110,13 @@ WaitList::InitCheck() const
status_t
WaitList::Wait(RecursiveLocker &locker, bigtime_t timeout)
WaitList::Wait(RecursiveLocker &locker, bigtime_t timeout, bool wakeNext)
{
// this function is called with `locker' held.
fWaiting++;
locker.Unlock();
status_t status = acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT |
B_CAN_INTERRUPT, timeout);
status_t status = acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT
| B_CAN_INTERRUPT, timeout);
locker.Lock();
if (status == B_OK)
if (wakeNext && status == B_OK)
Signal();
return status;
}
@ -117,14 +125,8 @@ WaitList::Wait(RecursiveLocker &locker, bigtime_t timeout)
void
WaitList::Signal()
{
// the same locker used with Wait() must be held
// when calling this function.
if (fWaiting == 0)
return;
fWaiting--;
release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE);
release_sem_etc(fSem, 1, B_DO_NOT_RESCHEDULE
| B_RELEASE_IF_WAITING_ONLY);
}
@ -152,7 +154,8 @@ TCPEndpoint::TCPEndpoint(net_socket *socket)
fRoundTripTime(TCP_INITIAL_RTT),
fState(CLOSED),
fFlags(0), //FLAG_OPTION_WINDOW_SHIFT),
fError(B_OK)
fError(B_OK),
fSpawned(false)
{
//gStackModule->init_timer(&fTimer, _TimeWait, this);
@ -228,7 +231,7 @@ TCPEndpoint::Close()
if (status != B_OK)
return status;
TRACE("Close(): Entering state %s", name_for_state(fState));
TRACE("Close() after Shutdown()");
if (socket->options & SO_LINGER) {
TRACE("Close(): Lingering for %i secs", socket->linger);
@ -277,13 +280,8 @@ TCPEndpoint::Connect(const struct sockaddr *address)
TRACE("Connect() on address %s",
AddressString(gDomain, address, true).Data());
if (address->sa_family != AF_INET)
return EAFNOSUPPORT;
RecursiveLocker locker(&fLock);
TRACE(" Connect(): in state %s", name_for_state(fState));
// Can only call connect() from CLOSED or LISTEN states
// otherwise endpoint is considered already connected
if (fState == LISTEN) {
@ -355,7 +353,7 @@ TCPEndpoint::Connect(const struct sockaddr *address)
TRACE(" Connect(): Connection complete: %s", strerror(status));
return status;
return posix_error(status);
}
@ -506,7 +504,7 @@ TCPEndpoint::SendData(net_buffer *buffer)
while (fSendQueue.Free() < chunk->size) {
status_t status = fSendList.Wait(lock, timeout);
if (status < B_OK)
return status;
return posix_error(status);
}
// TODO: check state!
@ -569,7 +567,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
// we need to wait until the connection becomes established
status_t status = fSendList.Wait(locker, timeout);
if (status < B_OK)
return status;
return posix_error(status);
}
}
@ -589,8 +587,8 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
return B_OK;
}
if (fReceiveQueue.Available() > dataNeeded ||
(fReceiveQueue.PushedData() > 0
if (fReceiveQueue.Available() >= dataNeeded ||
((fReceiveQueue.PushedData() > 0)
&& (fReceiveQueue.PushedData() >= fReceiveQueue.Available())))
break;
@ -603,7 +601,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
if (flags & MSG_DONTWAIT)
return B_WOULD_BLOCK;
status_t status = fReceiveList.Wait(locker, timeout);
status_t status = fReceiveList.Wait(locker, timeout, false);
if (status < B_OK) {
// The Open Group base specification mentions that EINTR should be
// returned if the recv() is interrupted before _any data_ is
@ -613,7 +611,7 @@ TCPEndpoint::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
&& fReceiveQueue.Available() > 0)
break;
return status;
return posix_error(status);
}
}
@ -707,8 +705,8 @@ TCPEndpoint::UpdateTimeWait()
int32
TCPEndpoint::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
{
TRACE("ListenReceive(): Connection in state %s received packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!",
name_for_state(fState), buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge);
TRACE("ListenReceive(): packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!",
buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge);
// Essentially, we accept only TCP_FLAG_SYNCHRONIZE in this state,
// but the error behaviour differs
@ -733,6 +731,8 @@ TCPEndpoint::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
TCPEndpoint *endpoint = (TCPEndpoint *)newSocket->first_protocol;
endpoint->fSpawned = true;
// TODO: proper error handling!
endpoint->fRoute = gDatalinkModule->get_route(gDomain,
@ -796,6 +796,9 @@ TCPEndpoint::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
int32
TCPEndpoint::SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *buffer)
{
TRACE("SynchronizeReceive(): packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!",
buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge);
if ((segment.flags & TCP_FLAG_ACKNOWLEDGE) != 0
&& (fInitialSendSequence >= segment.acknowledge
|| fSendMax < segment.acknowledge))
@ -855,8 +858,8 @@ TCPEndpoint::SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *buf
int32
TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
{
TRACE("Receive(): Connection in state %s received packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!",
name_for_state(fState), buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge);
TRACE("Receive(): packet %p (%lu bytes) with flags 0x%x, seq %lu, ack %lu!",
buffer, buffer->size, segment.flags, segment.sequence, segment.acknowledge);
// TODO: rethink locking!
@ -1146,7 +1149,7 @@ TCPEndpoint::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (buffer->size > 0 || (segment.flags & TCP_FLAG_SYNCHRONIZE) != 0)
action |= ACKNOWLEDGE;
TRACE("Receive():Entering state %s, segment action %ld", name_for_state(fState), action);
TRACE("Receive() Action %ld", action);
return action;
}

View File

@ -30,12 +30,11 @@ public:
status_t InitCheck() const;
status_t Wait(RecursiveLocker &, bigtime_t timeout);
status_t Wait(RecursiveLocker &, bigtime_t timeout, bool wakeNext = true);
void Signal();
private:
sem_id fSem;
int32 fWaiting;
};
@ -143,6 +142,8 @@ class TCPEndpoint : public net_protocol {
uint32 fFlags;
status_t fError;
bool fSpawned;
// timer
net_timer fRetransmitTimer;
net_timer fPersistTimer;

View File

@ -632,13 +632,47 @@ prepend_size(net_buffer *_buffer, size_t size, void **_contiguousBuffer)
if (node->header_space < size) {
// we need to prepend a new buffer
// TODO: implement me!
panic("prepending buffer not implemented\n");
size_t bytesLeft = size;
do {
if (node->header_space == 0) {
size_t headerSpace = BUFFER_SIZE - sizeof(data_header);
// no more space, need another data node
data_header *header = create_data_header(headerSpace);
if (header == NULL) {
// TODO: free up headers we already allocated!
return B_NO_MEMORY;
}
data_node *previous = node;
node = (data_node *)add_data_node(header);
init_data_node(node, headerSpace);
node->header_space = header->data_space;
header->first_node = node;
list_insert_item_before(&buffer->buffers, previous, node);
} else {
size_t willConsume = min_c(size, node->header_space);
node->header_space -= willConsume;
node->start -= willConsume;
node->used += willConsume;
bytesLeft -= willConsume;
}
} while (bytesLeft > 0);
size_t offset = 0;
for (node = (data_node *)list_get_first_item(&buffer->buffers);
node != NULL; node = (data_node *)
list_get_next_item(&buffer->buffers, node)) {
node->offset = offset;
offset += node->used;
}
if (_contiguousBuffer)
*_contiguousBuffer = NULL;
return B_ERROR;
return B_OK;
}
// the data fits into this buffer