* Reintroduced the accept semaphore: it will be inherited by the new connections

of a LISTEN socket.
* Reading data should now more or less work, too.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@19401 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2006-11-30 20:47:26 +00:00
parent 94fb808a48
commit 02894e3670
4 changed files with 61 additions and 12 deletions

View File

@ -52,6 +52,8 @@ BufferQueue::SetMaxBytes(size_t maxBytes)
void
BufferQueue::SetInitialSequence(tcp_sequence sequence)
{
TRACE(("BufferQueue@%p::SetInitialSequence(%lu)\n", this, (uint32)sequence));
fFirstSequence = fLastSequence = sequence;
}
@ -264,8 +266,14 @@ BufferQueue::Get(net_buffer *buffer, tcp_sequence sequence, size_t bytes)
status_t
BufferQueue::Get(size_t bytes, bool remove, net_buffer **_buffer)
{
if (Available() < bytes || bytes == 0)
return B_BAD_VALUE;
if (bytes > Available())
bytes = Available();
if (bytes == 0) {
// we don't need to create a buffer when there is no data
*_buffer = NULL;
return B_OK;
}
net_buffer *buffer = fList.First();
size_t bytesLeft = bytes;

View File

@ -101,7 +101,6 @@ TCPConnection::~TCPConnection()
recursive_lock_destroy(&fLock);
//benaphore_destroy(&fReceiveLock);
//benaphore_destroy(&fSendLock);
//delete_sem(fAcceptSemaphore);
delete_sem(fReceiveLock);
delete_sem(fSendLock);
}
@ -110,12 +109,12 @@ TCPConnection::~TCPConnection()
status_t
TCPConnection::InitCheck() const
{
if (fLock.sem < B_OK)
return fLock.sem;
if (fReceiveLock < B_OK)
return fReceiveLock;
if (fSendLock < B_OK)
return fSendLock;
//if (fAcceptSemaphore < B_OK)
// return fAcceptSemaphore;
return B_OK;
}
@ -139,6 +138,9 @@ TCPConnection::Close()
TRACE(("TCP:%p.Close()\n", this));
RecursiveLocker lock(fLock);
if (fState == LISTEN)
delete_sem(fAcceptSemaphore);
if (fState == SYNCHRONIZE_SENT || fState == LISTEN) {
fState = CLOSED;
return B_OK;
@ -281,7 +283,7 @@ TCPConnection::Accept(struct net_socket **_acceptedSocket)
// TODO: test for non-blocking I/O
status_t status;
do {
status = acquire_sem_etc(fReceiveLock, 1, B_RELATIVE_TIMEOUT,
status = acquire_sem_etc(fAcceptSemaphore, 1, B_RELATIVE_TIMEOUT,
socket->receive.timeout);
if (status < B_OK)
return status;
@ -320,6 +322,10 @@ TCPConnection::Bind(sockaddr *address)
// TODO: use port 40000 and following for now
static int e = 40000;
gAddressModule->set_port((sockaddr *)&socket->address, htons(e++));
TRACE(("TCP:%p.Bind() on address with ephemeral port %s, peer %s\n", this,
AddressString(gDomain, (sockaddr *)&socket->address, true).Data(),
AddressString(gDomain, (sockaddr *)&socket->peer, true).Data()));
status = insert_connection(this);
} else {
status = insert_connection(this);
@ -355,6 +361,7 @@ TCPConnection::Listen(int count)
if (fState != CLOSED)
return B_BAD_VALUE;
fAcceptSemaphore = create_sem(0, "tcp accept");
fState = LISTEN;
return B_OK;
}
@ -414,6 +421,13 @@ TCPConnection::ReadData(size_t numBytes, uint32 flags, net_buffer** _buffer)
// read data out of buffer
// TODO: add support for urgent data (MSG_OOB)
// TODO: wait until enough bytes are available
do {
// TODO: we may wait much longer than the time we wanted to...
status_t status = acquire_sem_etc(fReceiveLock, 1,
B_RELATIVE_TIMEOUT | B_CAN_INTERRUPT, socket->receive.timeout);
if (status < B_OK)
return status;
} while (fReceiveQueue.Available() < socket->receive.low_water_mark);
RecursiveLocker locker(fLock);
return fReceiveQueue.Get(numBytes, (flags & MSG_PEEK) == 0, _buffer);
@ -514,7 +528,9 @@ TCPConnection::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
return DROP;
connection->fInitialReceiveSequence = segment.sequence;
connection->fReceiveQueue.SetInitialSequence(segment.sequence + 1);
connection->fState = SYNCHRONIZE_RECEIVED;
connection->fAcceptSemaphore = fAcceptSemaphore;
connection->fReceiveMaxSegmentSize = connection->fRoute->mtu - 40;
// 40 bytes for IP and TCP header without any options
// TODO: make this depending on the RTF_LOCAL flag?
@ -542,6 +558,7 @@ TCPConnection::ListenReceive(tcp_segment_header &segment, net_buffer *buffer)
// we handled this flag now, it must not be set for further processing
return connection->Receive(segment, buffer);
// TODO: here, the ack/delayed ack call will be made on the parent socket!
}
@ -562,19 +579,26 @@ TCPConnection::SynchronizeSentReceive(tcp_segment_header &segment, net_buffer *b
if ((segment.flags & TCP_FLAG_SYNCHRONIZE) == 0)
return DROP;
fInitialReceiveSequence = segment.sequence;
segment.sequence++;
fSendUnacknowledged = segment.acknowledge;
fReceiveNext = segment.sequence;
fInitialReceiveSequence = segment.sequence;
fReceiveQueue.SetInitialSequence(fReceiveNext);
if (segment.flags & TCP_FLAG_ACKNOWLEDGE) {
// the connection has been established
fState = ESTABLISHED;
if (socket->parent != NULL) {
gSocketModule->set_connected(socket);
release_sem_etc(fAcceptSemaphore, 1, B_DO_NOT_RESCHEDULE);
}
release_sem_etc(fSendLock, 1, B_DO_NOT_RESCHEDULE);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: this is not enough - we need to use B_RELEASE_ALL
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
} else {
// simultaneous open
fState = SYNCHRONIZE_RECEIVED;
@ -633,8 +657,8 @@ TCPConnection::Receive(tcp_segment_header &segment, net_buffer *buffer)
&& fReceiveQueue.Free() >= buffer->size) {
// we're on the receiving end of the connection, and this segment
// is the one we were expecting, in-sequence
fReceiveQueue.Add(buffer, segment.sequence);
fReceiveNext += buffer->size;
fReceiveQueue.Add(buffer, segment.sequence);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
@ -656,6 +680,10 @@ TCPConnection::Receive(tcp_segment_header &segment, net_buffer *buffer)
// TODO: close connection depending on state
fError = ECONNREFUSED;
fState = CLOSED;
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
}
return DROP;
@ -717,9 +745,17 @@ TCPConnection::Receive(tcp_segment_header &segment, net_buffer *buffer)
// process acknowledged data
if (fState == SYNCHRONIZE_RECEIVED) {
// TODO: window scaling!
gSocketModule->set_connected(socket);
//fReceiveQueue.SetInitialSequence(fReceiveNext);
if (socket->parent != NULL) {
gSocketModule->set_connected(socket);
release_sem_etc(fAcceptSemaphore, 1, B_DO_NOT_RESCHEDULE);
}
fState = ESTABLISHED;
release_sem_etc(fSendLock, 1, B_DO_NOT_RESCHEDULE);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
}
if (fSendMax < segment.acknowledge)
return DROP | IMMEDIATE_ACKNOWLEDGE;
@ -816,6 +852,10 @@ TCPConnection::Receive(tcp_segment_header &segment, net_buffer *buffer)
if (fReceiveNext == segment.sequence)
fReceiveNext += buffer->size;
fReceiveQueue.Add(buffer, segment.sequence);
release_sem_etc(fReceiveLock, 1, B_DO_NOT_RESCHEDULE);
// TODO: real conditional locking needed!
gSocketModule->notify(socket, B_SELECT_READ, fReceiveQueue.Available());
} else
gBufferModule->free(buffer);

View File

@ -74,6 +74,7 @@ class TCPConnection : public net_protocol {
recursive_lock fLock;
sem_id fReceiveLock;
sem_id fSendLock;
sem_id fAcceptSemaphore;
uint8 fOptions;
uint8 fSendWindowShift;

View File

@ -356,7 +356,6 @@ find_connection(sockaddr *local, sockaddr *peer)
net_protocol *
tcp_init_protocol(net_socket *socket)
{
socket->protocol = IPPROTO_TCP;
TCPConnection *protocol = new (std::nothrow) TCPConnection(socket);
if (protocol == NULL)
return NULL;
@ -367,6 +366,7 @@ tcp_init_protocol(net_socket *socket)
}
TRACE(("Creating new TCPConnection: %p\n", protocol));
socket->protocol = IPPROTO_TCP;
return protocol;
}
@ -660,7 +660,7 @@ tcp_init()
if (status < B_OK)
goto err5;
status = gStackModule->register_domain_protocols(AF_INET, SOCK_STREAM, IPPROTO_IP,
status = gStackModule->register_domain_protocols(AF_INET, SOCK_STREAM, 0,
"network/protocols/tcp/v1",
"network/protocols/ipv4/v1",
NULL);