* Added support for SCM_RIGHTS (sending file descriptors).

* Fixed shutdown(). It was computing the wrong fifo flags and set the
  wrong ones from the wrong variable on the peer fifo.
* Generally made the Unix sockets behave more like they should. E.g.
  after closing one end, it must still be possible to read from the
  other (as long as there are buffered data). Also fine-tuned when to
  return what errors from recv()/send().


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@24943 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-04-12 09:37:13 +00:00
parent 316b5f9214
commit 59234b36ce
4 changed files with 250 additions and 59 deletions

View File

@ -144,6 +144,10 @@ UnixEndpoint::Free()
{
TRACE("[%ld] %p->UnixEndpoint::Free()\n", find_thread(NULL), this);
UnixEndpointLocker locker(this);
_UnsetReceiveFifo();
RETURN_ERROR(B_OK);
}
@ -248,6 +252,8 @@ UnixEndpoint::Listen(int backlog)
if (fAcceptSemaphore < 0)
RETURN_ERROR(ENOBUFS);
_UnsetReceiveFifo();
fState = UNIX_ENDPOINT_LISTENING;
RETURN_ERROR(B_OK);
@ -348,6 +354,9 @@ UnixEndpoint::Connect(const struct sockaddr *_address)
connectedEndpoint->_Spawn(this, peerFifo);
// update our attributes
_UnsetReceiveFifo();
fPeerEndpoint = connectedEndpoint;
PeerAddress().SetTo(&connectedEndpoint->socket->address);
fPeerEndpoint->AddReference();
@ -459,23 +468,30 @@ UnixEndpoint::Send(net_buffer *buffer)
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
if (error == UNIX_FIFO_SHUTDOWN) {
// This might either mean, that someone called shutdown() or close(),
// or our peer closed the connection.
if (fPeerEndpoint == peerEndpoint) {
// Orderly shutdown.
error = EPIPE;
} else if (fState == UNIX_ENDPOINT_CLOSED) {
// The FD has been closed.
error = EBADF;
} else {
// Peer closed the connection.
error = EPIPE;
send_signal(find_thread(NULL), SIGPIPE);
}
} else if (error == B_TIMED_OUT && timeout == 0) {
// translate non-blocking timeouts to the correct error code
error = B_WOULD_BLOCK;
switch (error) {
case UNIX_FIFO_SHUTDOWN:
if (fPeerEndpoint == peerEndpoint
&& fState == UNIX_ENDPOINT_CONNECTED) {
// Orderly write shutdown on our side.
// Note: Linux and Solaris also send a SIGPIPE, but according
// the send() specification that shouldn't be done.
error = EPIPE;
} else {
// The FD has been closed.
error = EBADF;
}
break;
case EPIPE:
// The peer closed connection or shutdown its read side. Reward
// the caller with a SIGPIPE.
if (gStackModule->is_syscall())
send_signal(find_thread(NULL), SIGPIPE);
break;
case B_TIMED_OUT:
// Translate non-blocking timeouts to the correct error code.
if (timeout == 0)
error = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(error);
@ -496,12 +512,11 @@ UnixEndpoint::Receive(size_t numBytes, uint32 flags, net_buffer **_buffer)
UnixEndpointLocker locker(this);
if (fState != UNIX_ENDPOINT_CONNECTED)
// We can read as long as we have a FIFO. I.e. we are still connected, or
// disconnected and not yet reconnected/listening/closed.
if (fReceiveFifo == NULL)
RETURN_ERROR(ENOTCONN);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
Reference<UnixEndpoint> peerReference(peerEndpoint);
// lock our FIFO
UnixFifo* fifo = fReceiveFifo;
Reference<UnixFifo> _(fifo);
@ -533,23 +548,28 @@ UnixEndpoint::Receive(size_t numBytes, uint32 flags, net_buffer **_buffer)
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
if (error == UNIX_FIFO_SHUTDOWN) {
// This might either mean, that someone called shutdown() or close(),
// or our peer closed the connection.
if (fPeerEndpoint == peerEndpoint) {
// Orderly shutdown. Return B_OK, but a size of 0.
error = B_OK;
*_buffer = NULL;
} else if (fState == UNIX_ENDPOINT_CLOSED) {
// The FD has been closed.
error = EBADF;
} else {
// The connection has been closed by our peer.
error = ECONNRESET;
}
} else if (error == B_TIMED_OUT && timeout == 0) {
// translate non-blocking timeouts to the correct error code
error = B_WOULD_BLOCK;
switch (error) {
case UNIX_FIFO_SHUTDOWN:
// Either our socket was closed or read shutdown.
if (fState == UNIX_ENDPOINT_CLOSED) {
// The FD has been closed.
error = EBADF;
} else {
// if (fReceiveFifo == fifo) {
// Orderly shutdown or the peer closed the connection.
// } else {
// Weird case: Peer closed connection and we are already
// reconnected (or listening).
// }
error = B_OK;
*_buffer = NULL;
}
break;
case B_TIMED_OUT:
// translate non-blocking timeouts to the correct error code
if (timeout == 0)
error = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(error);
@ -616,19 +636,19 @@ UnixEndpoint::Shutdown(int direction)
uint32 shutdown;
uint32 peerShutdown;
// translate the direction into shutdown flags
// translate the direction into shutdown flags for our and the peer fifo
switch (direction) {
case SHUT_RD:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
peerShutdown = 0;
break;
case SHUT_WR:
shutdown = UNIX_FIFO_SHUTDOWN_WRITE;
peerShutdown = UNIX_FIFO_SHUTDOWN_READ;
shutdown = 0;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
case SHUT_RDWR:
shutdown = peerShutdown = UNIX_FIFO_SHUTDOWN_READ
| UNIX_FIFO_SHUTDOWN_WRITE;
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
default:
RETURN_ERROR(B_BAD_VALUE);
@ -649,7 +669,7 @@ UnixEndpoint::Shutdown(int direction)
// shutdown peer FIFO
fPeerEndpoint->fReceiveFifo->Lock();
fPeerEndpoint->fReceiveFifo->Shutdown(shutdown);
fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
fPeerEndpoint->fReceiveFifo->Unlock();
RETURN_ERROR(B_OK);
@ -678,12 +698,10 @@ UnixEndpoint::_Disconnect()
{
// Both endpoints must be locked.
// Shutdown and unset the receive FIFO.
// Write shutdown the receive FIFO.
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_READ | UNIX_FIFO_SHUTDOWN_WRITE);
fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
fReceiveFifo->Unlock();
fReceiveFifo->RemoveReference();
fReceiveFifo = NULL;
// select() notification.
gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
@ -769,6 +787,16 @@ UnixEndpoint::_Unbind()
}
void
UnixEndpoint::_UnsetReceiveFifo()
{
if (fReceiveFifo) {
fReceiveFifo->RemoveReference();
fReceiveFifo = NULL;
}
}
void
UnixEndpoint::_StopListening()
{

View File

@ -101,6 +101,7 @@ private:
status_t _Bind(int32 internalID);
status_t _Unbind();
void _UnsetReceiveFifo();
void _StopListening();
private:

View File

@ -8,7 +8,7 @@
#include "unix.h"
#define UNIX_FIFO_DEBUG_LEVEL 1
#define UNIX_FIFO_DEBUG_LEVEL 2
#define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL
#include "UnixDebug.h"
@ -64,6 +64,9 @@ UnixBufferQueue::Read(size_t size, net_buffer** _buffer)
return B_OK;
}
// transfer the ancillary data
gBufferModule->transfer_ancillary_data(nextBuffer, buffer);
if (nextBuffer->size > toCopy) {
// remove the part we've copied
gBufferModule->remove_header(nextBuffer, toCopy);
@ -92,6 +95,9 @@ UnixBufferQueue::Read(size_t size, net_buffer** _buffer)
return error;
}
// transfer the ancillary data
gBufferModule->transfer_ancillary_data(buffer, newBuffer);
// remove the part we've copied
gBufferModule->remove_header(buffer, size);
@ -174,11 +180,11 @@ UnixFifo::Shutdown(uint32 shutdown)
{
fShutdown |= shutdown;
if ((shutdown & UNIX_FIFO_SHUTDOWN_READ) != 0)
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
if ((shutdown & UNIX_FIFO_SHUTDOWN_WRITE) != 0)
if (shutdown != 0) {
// Shutting down either end also effects the other, so notify both.
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
}
}
@ -312,11 +318,20 @@ UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
if (IsReadShutdown())
return UNIX_FIFO_SHUTDOWN;
// wait for any data to become available
if (fBuffer.Readable() == 0 && timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
if (fBuffer.Readable() == 0) {
if (IsWriteShutdown()) {
*_buffer = NULL;
RETURN_ERROR(B_OK);
}
while (fBuffer.Readable() == 0 && !IsReadShutdown()) {
if (timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
}
// wait for any data to become available
// TODO: Support low water marks!
while (fBuffer.Readable() == 0
&& !IsReadShutdown() && !IsWriteShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fReaderSem, 1,
@ -331,6 +346,11 @@ UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
if (IsReadShutdown())
return UNIX_FIFO_SHUTDOWN;
if (fBuffer.Readable() == 0 && IsWriteShutdown()) {
*_buffer = NULL;
RETURN_ERROR(B_OK);
}
RETURN_ERROR(fBuffer.Read(numBytes, _buffer));
}
@ -357,11 +377,15 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
if (IsReadShutdown())
return EPIPE;
// wait for any space to become available
if (fBuffer.Writable() < request.size && timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
while (fBuffer.Writable() < request.size && !IsWriteShutdown()) {
while (fBuffer.Writable() < request.size && !IsWriteShutdown()
&& !IsReadShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fWriterSem, 1,
@ -376,5 +400,8 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
if (IsReadShutdown())
return EPIPE;
RETURN_ERROR(fBuffer.Write(buffer));
}

View File

@ -8,6 +8,9 @@
#include <new>
#include <AutoDeleter.h>
#include <fs/fd.h>
#include <lock.h>
#include <util/AutoLock.h>
#include <vfs.h>
@ -37,6 +40,23 @@ UnixAddressManager gAddressManager;
static struct net_domain *sDomain;
void
destroy_scm_rights_descriptors(const ancillary_data_header* header,
void* data)
{
int count = header->len / sizeof(file_descriptor*);
file_descriptor** descriptors = (file_descriptor**)data;
for (int i = 0; i < count; i++) {
if (descriptors[i] != NULL)
put_fd(descriptors[i]);
}
}
// #pragma mark -
net_protocol *
unix_init_protocol(net_socket *socket)
{
@ -247,6 +267,119 @@ unix_error_reply(net_protocol *protocol, net_buffer *causedError, uint32 code,
}
status_t
unix_attach_ancillary_data(net_protocol *self, net_buffer *buffer,
const cmsghdr *header)
{
TRACE("[%ld] unix_attach_ancillary_data(%p, %p, %p (level: %d, type: %d, "
"len: %d))\n", find_thread(NULL), self, buffer, header,
header->cmsg_level, header->cmsg_type, (int)header->cmsg_len);
// we support only SCM_RIGHTS
if (header->cmsg_level != SOL_SOCKET || header->cmsg_type != SCM_RIGHTS)
return B_BAD_VALUE;
int* fds = (int*)CMSG_DATA(header);
int count = (header->cmsg_len - CMSG_ALIGN(sizeof(cmsghdr))) / sizeof(int);
if (count == 0)
return B_BAD_VALUE;
file_descriptor** descriptors = new(std::nothrow) file_descriptor*[count];
if (descriptors == NULL)
return ENOBUFS;
ArrayDeleter<file_descriptor*> _(descriptors);
memset(descriptors, 0, sizeof(file_descriptor*) * count);
// get the file descriptors
io_context* ioContext = get_current_io_context(!gStackModule->is_syscall());
status_t error = B_OK;
for (int i = 0; i < count; i++) {
descriptors[i] = get_fd(ioContext, fds[i]);
if (descriptors[i] == NULL) {
error = EBADF;
break;
}
}
// attach the ancillary data to the buffer
if (error == B_OK) {
ancillary_data_header header;
header.level = SOL_SOCKET;
header.type = SCM_RIGHTS;
header.len = count * sizeof(file_descriptor*);
TRACE("[%ld] unix_attach_ancillary_data(): attaching %d FDs to "
"buffer\n", find_thread(NULL), count);
error = gBufferModule->attach_ancillary_data(buffer, &header,
descriptors, destroy_scm_rights_descriptors, NULL);
}
// cleanup on error
if (error != B_OK) {
for (int i = 0; i < count; i++) {
if (descriptors[i] != NULL)
put_fd(descriptors[i]);
}
}
return error;
}
ssize_t
unix_process_ancillary_data(net_protocol *self,
const ancillary_data_header *header, const void *data, void *buffer,
size_t bufferSize)
{
TRACE("[%ld] unix_process_ancillary_data(%p, %p (level: %d, type: %d, "
"len: %lu), %p, %p, %lu)\n", find_thread(NULL), self, header,
header->level, header->type, header->len, data, buffer, bufferSize);
// we support only SCM_RIGHTS
if (header->level != SOL_SOCKET || header->type != SCM_RIGHTS)
return B_BAD_VALUE;
int count = header->len / sizeof(file_descriptor*);
file_descriptor** descriptors = (file_descriptor**)data;
// check if there's enough space in the buffer
size_t neededBufferSpace = CMSG_SPACE(sizeof(int) * count);
if (bufferSize < neededBufferSpace)
return B_BAD_VALUE;
// init header
cmsghdr* messageHeader = (cmsghdr*)buffer;
messageHeader->cmsg_level = header->level;
messageHeader->cmsg_type = header->type;
messageHeader->cmsg_len = CMSG_LEN(sizeof(int) * count);
// create FDs for the current process
int* fds = (int*)CMSG_DATA(messageHeader);
io_context* ioContext = get_current_io_context(!gStackModule->is_syscall());
status_t error = B_OK;
for (int i = 0; i < count; i++) {
// get an additional reference which will go to the FD table index
inc_fd_ref_count(descriptors[i]);
fds[i] = new_fd(ioContext, descriptors[i]);
if (fds[i] < 0) {
error = fds[i];
put_fd(descriptors[i]);
// close FD indices
for (int k = i - 1; k >= 0; k--)
close_fd_index(ioContext, fds[k]);
break;
}
}
return error == B_OK ? neededBufferSpace : error;
}
// #pragma mark -
@ -335,6 +468,8 @@ net_protocol_module_info gUnixModule = {
unix_deliver_data,
unix_error,
unix_error_reply,
unix_attach_ancillary_data,
unix_process_ancillary_data
};
module_dependency module_dependencies[] = {