Protocol module for Unix domain stream type sockets. The implementation

is almost complete, but still quite buggy (receiving data has a good
chance to drop into KDL). 


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@24884 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-04-10 03:17:05 +00:00
parent f8b4d83f2f
commit eb8b342d56
12 changed files with 2435 additions and 0 deletions

View File

@ -4,3 +4,4 @@ SubInclude HAIKU_TOP src add-ons kernel network protocols icmp ;
SubInclude HAIKU_TOP src add-ons kernel network protocols ipv4 ;
SubInclude HAIKU_TOP src add-ons kernel network protocols tcp ;
SubInclude HAIKU_TOP src add-ons kernel network protocols udp ;
SubInclude HAIKU_TOP src add-ons kernel network protocols unix ;

View File

@ -0,0 +1,17 @@
SubDir HAIKU_TOP src add-ons kernel network protocols unix ;
UsePrivateKernelHeaders ;
UsePrivateHeaders net ;
UsePrivateHeaders shared ;
KernelAddon unix :
unix.cpp
UnixAddress.cpp
UnixEndpoint.cpp
UnixFifo.cpp
Referenceable.cpp
;
SEARCH on [ FGristFiles Referenceable.cpp ]
= [ FDirName $(HAIKU_TOP) src kits support ] ;

View File

@ -0,0 +1,306 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixAddress.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <util/khash.h>
#include <net_datalink.h>
#include <NetUtilities.h>
#include "unix.h"
static const sockaddr_un kEmptyAddress = {
4, // sun_len
AF_UNIX, // sun_family
{ '\0', '\0' } // sun_path
};
char*
UnixAddress::ToString(char *buffer, size_t bufferSize) const
{
if (!IsValid())
strlcpy(buffer, "<empty>", bufferSize);
else if (IsInternalAddress())
snprintf(buffer, bufferSize, "<%05lx>", fInternalID);
else
snprintf(buffer, bufferSize, "<%ld, %lld>", fVolumeID, fNodeID);
return buffer;
}
// #pragma mark -
static status_t
unix_copy_address(const sockaddr *from, sockaddr **to, bool replaceWithZeros,
const sockaddr *mask)
{
if (replaceWithZeros) {
sockaddr_un* newAddress = (sockaddr_un*)malloc(kEmptyAddress.sun_len);
if (newAddress == NULL)
return B_NO_MEMORY;
memcpy(newAddress, &kEmptyAddress, kEmptyAddress.sun_len);
*to = (sockaddr*)newAddress;
return B_OK;
} else {
if (from->sa_family != AF_UNIX)
return B_MISMATCHED_VALUES;
*to = (sockaddr*)malloc(from->sa_len);
if (*to == NULL)
return B_NO_MEMORY;
memcpy(*to, from, from->sa_len);
return B_OK;
}
}
static bool
unix_equal_addresses(const sockaddr *a, const sockaddr *b)
{
// NOTE: We compare syntactically only. The real check would involve
// looking up the node, if for FS addresses.
if (a->sa_len != b->sa_len)
return false;
return memcmp(a, b, a->sa_len) == 0;
}
static bool
unix_equal_ports(const sockaddr *a, const sockaddr *b)
{
// no ports
return true;
}
static bool
unix_equal_addresses_and_ports(const sockaddr *a, const sockaddr *b)
{
return unix_equal_addresses(a, b);
}
static bool
unix_equal_masked_addresses(const sockaddr *a, const sockaddr *b,
const sockaddr *mask)
{
// no masks
return unix_equal_addresses(a, b);
}
static bool
unix_is_empty_address(const sockaddr *address, bool checkPort)
{
return address->sa_len >= kEmptyAddress.sun_len
&& memcmp(address, &kEmptyAddress, kEmptyAddress.sun_len) == 0;
}
static int32
unix_first_mask_bit(const sockaddr *mask)
{
return 0;
}
static bool
unix_check_mask(const sockaddr *address)
{
return false;
}
static status_t
unix_print_address_buffer(const sockaddr *_address, char *buffer,
size_t bufferSize, bool printPort)
{
if (!buffer)
return B_BAD_VALUE;
sockaddr_un* address = (sockaddr_un*)_address;
if (address == NULL)
strlcpy(buffer, "<none>", bufferSize);
else if (address->sun_path[0] != '\0')
strlcpy(buffer, address->sun_path, bufferSize);
else if (address->sun_path[1] != '\0')
snprintf(buffer, bufferSize, "<%.5s>", address->sun_path + 1);
else
strlcpy(buffer, "<empty>", bufferSize);
return B_OK;
}
static status_t
unix_print_address(const sockaddr *address, char **_buffer, bool printPort)
{
char buffer[128];
status_t error = unix_print_address_buffer(address, buffer, sizeof(buffer),
printPort);
if (error != B_OK)
return error;
*_buffer = strdup(buffer);
return *_buffer != NULL ? B_OK : B_NO_MEMORY;
}
static uint16
unix_get_port(const sockaddr *address)
{
return 0;
}
static status_t
unix_set_port(sockaddr *address, uint16 port)
{
return B_BAD_VALUE;
}
static status_t
unix_set_to(sockaddr *address, const sockaddr *from)
{
if (address == NULL || from == NULL)
return B_BAD_VALUE;
if (from->sa_family != AF_UNIX)
return B_MISMATCHED_VALUES;
memcpy(address, from, from->sa_len);
return B_OK;
}
static status_t
unix_set_to_empty_address(sockaddr *address)
{
return unix_set_to(address, (sockaddr*)&kEmptyAddress);
}
static status_t
unix_mask_address(const sockaddr *address, const sockaddr *mask,
sockaddr *result)
{
// no masks
return unix_set_to(result, address);
}
static status_t
unix_set_to_defaults(sockaddr *defaultMask, sockaddr *defaultBroadcast,
sockaddr *address, sockaddr *netmask)
{
if (address == NULL)
return B_BAD_VALUE;
status_t error = B_OK;
if (defaultMask != NULL)
error = unix_set_to_empty_address(defaultMask);
if (error == B_OK && defaultBroadcast != NULL)
error = unix_set_to_empty_address(defaultBroadcast);
return error;
}
static status_t
unix_update_to(sockaddr *address, const sockaddr *from)
{
if (address == NULL || from == NULL)
return B_BAD_VALUE;
if (unix_is_empty_address(from, false))
return B_OK;
return unix_set_to(address, from);
}
static uint32
hash_address(const sockaddr_un* address)
{
if (address == NULL)
return 0;
if (address->sun_path[0] == '\0') {
char buffer[6];
strlcpy(buffer, address->sun_path + 1, 6);
return hash_hash_string(buffer);
}
return hash_hash_string(address->sun_path);
}
static uint32
unix_hash_address_pair(const sockaddr *ourAddress, const sockaddr *peerAddress)
{
return hash_address((sockaddr_un*)ourAddress) * 17
+ hash_address((sockaddr_un*)peerAddress);
}
static status_t
unix_checksum_address(struct Checksum *checksum, const sockaddr *_address)
{
if (checksum == NULL || _address == NULL)
return B_BAD_VALUE;
sockaddr_un* address = (sockaddr_un*)_address;
int len = (char*)address + address->sun_len - address->sun_path;
for (int i = 0; i < len; i++)
(*checksum) << (uint8)address->sun_path[i];
return B_OK;
}
net_address_module_info gAddressModule = {
{
NULL,
0,
NULL
},
unix_copy_address,
unix_mask_address,
unix_equal_addresses,
unix_equal_ports,
unix_equal_addresses_and_ports,
unix_equal_masked_addresses,
unix_is_empty_address,
unix_first_mask_bit,
unix_check_mask,
unix_print_address,
unix_print_address_buffer,
unix_get_port,
unix_set_port,
unix_set_to,
unix_set_to_empty_address,
unix_set_to_defaults,
unix_update_to,
unix_hash_address_pair,
unix_checksum_address,
NULL // matches_broadcast_address
};

View File

@ -0,0 +1,174 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_ADDRESS_H
#define UNIX_ADDRESS_H
#include <sys/un.h>
#include <SupportDefs.h>
// NOTE: We support the standard FS address space as well as the alternative
// internal address space Linux features (sun_path[0] is 0, followed by 5 hex
// digits, without null-termination). The latter one is nice to have, because
// the address lookup is quick (hash table lookup, instead of asking the VFS to
// resolve the path), and we don't have to pollute the FS when auto-binding
// sockets (e.g. on connect()).
#define INTERNAL_UNIX_ADDRESS_LEN (2 + 1 + 5)
// sun_len + sun_family + null byte + 5 hex digits
struct vnode;
class UnixAddress {
public:
UnixAddress()
{
Unset();
}
UnixAddress(const UnixAddress& other)
{
*this = other;
}
UnixAddress(int32 internalID)
{
SetTo(internalID);
}
UnixAddress(dev_t volumeID, ino_t nodeID, struct vnode* vnode)
{
SetTo(volumeID, nodeID, vnode);
}
void SetTo(int32 internalID)
{
fInternalID = internalID;
fVolumeID = -1;
fNodeID = -1;
fVnode = NULL;
}
void SetTo(dev_t volumeID, ino_t nodeID, struct vnode* vnode)
{
fInternalID = -1;
fVolumeID = volumeID;
fNodeID = nodeID;
fVnode = vnode;
}
void Unset()
{
fInternalID = -1;
fVolumeID = -1;
fNodeID = -1;
fVnode = NULL;
}
bool IsValid() const
{
return fInternalID >= 0 || fVolumeID >= 0;
}
bool IsInternalAddress() const
{
return fInternalID >= 0;
}
int32 InternalID() const
{
return fInternalID;
}
int32 VolumeID() const
{
return fVolumeID;
}
int32 NodeID() const
{
return fNodeID;
}
struct vnode* Vnode() const
{
return fVnode;
}
uint32 HashCode() const
{
return fInternalID >= 0
? fInternalID
: uint32(fVolumeID) ^ uint32(fNodeID);
}
char* ToString(char *buffer, size_t bufferSize) const;
UnixAddress& operator=(const UnixAddress& other)
{
fInternalID = other.fInternalID;
fVolumeID = other.fVolumeID;
fNodeID = other.fNodeID;
fVnode = other.fVnode;
return *this;
}
bool operator==(const UnixAddress& other) const
{
return fInternalID >= 0
? fInternalID == other.fInternalID
: fVolumeID == other.fVolumeID
&& fNodeID == other.fNodeID;
}
bool operator!=(const UnixAddress& other) const
{
return !(*this == other);
}
static bool IsEmptyAddress(const sockaddr_un& address)
{
return address.sun_len == sizeof(sockaddr)
&& address.sun_path[0] == '\0' && address.sun_path[1] == '\0';
}
static int32 InternalID(const sockaddr_un& address)
{
if (address.sun_len < INTERNAL_UNIX_ADDRESS_LEN
|| address.sun_path[0] != '\0') {
return B_BAD_VALUE;
}
// parse the ID
int32 id = 0;
for (int32 i = 0; i < 5; i++) {
char c = address.sun_path[i + 1];
if (c >= '0' && c <= '9')
id = (id << 4) + (c - '0');
else if (c >= 'a' && c <= 'f')
id = (id << 4) + 10 + (c - 'a');
else
return B_BAD_VALUE;
}
return id;
}
private:
// fat interface: If fInternalID is >= 0, it's an address in the internal
// namespace, otherwise a FS address.
int32 fInternalID;
dev_t fVolumeID;
ino_t fNodeID;
struct vnode* fVnode;
};
#endif // UNIX_ADDRESS_H

View File

@ -0,0 +1,119 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_ADDRESS_MANAGER_H
#define UNIX_ADDRESS_MANAGER_H
#include <lock.h>
#include <util/OpenHashTable.h>
#include "UnixAddress.h"
#include "UnixEndpoint.h"
struct UnixAddressHashDefinition {
typedef UnixAddress KeyType;
typedef UnixEndpoint ValueType;
size_t HashKey(const KeyType& key) const
{
return key.HashCode();
}
size_t Hash(UnixEndpoint* endpoint) const
{
return HashKey(endpoint->Address());
}
bool Compare(const KeyType& key, UnixEndpoint* endpoint) const
{
return key == endpoint->Address();
}
HashTableLink<UnixEndpoint>* GetLink(UnixEndpoint* endpoint) const
{
return endpoint->HashTableLink();
}
};
class UnixAddressManager {
public:
UnixAddressManager()
{
fLock.sem = -1;
}
~UnixAddressManager()
{
if (fLock.sem >= 0)
benaphore_destroy(&fLock);
}
status_t Init()
{
status_t error = fBoundEndpoints.InitCheck();
if (error != B_OK)
return error;
return benaphore_init(&fLock, "unix address manager");
}
bool Lock()
{
return benaphore_lock(&fLock) == B_OK;
}
void Unlock()
{
benaphore_unlock(&fLock);
}
UnixEndpoint* Lookup(const UnixAddress& address) const
{
return fBoundEndpoints.Lookup(address);
}
void Add(UnixEndpoint* endpoint)
{
fBoundEndpoints.Insert(endpoint);
}
void Remove(UnixEndpoint* endpoint)
{
fBoundEndpoints.Remove(endpoint);
}
int32 NextInternalID()
{
int32 id = fNextInternalID;
fNextInternalID = (id + 1) & 0xfffff;
return id;
}
int32 NextUnusedInternalID()
{
for (int32 i = 0xfffff; i >= 0; i--) {
int32 id = NextInternalID();
UnixAddress address(id);
if (Lookup(address) == NULL)
return id;
}
return ENOBUFS;
}
private:
typedef OpenHashTable<UnixAddressHashDefinition, false> EndpointTable;
benaphore fLock;
EndpointTable fBoundEndpoints;
int32 fNextInternalID;
};
typedef AutoLocker<UnixAddressManager> UnixAddressManagerLocker;
#endif // UNIX_ADDRESS_MANAGER_H

View File

@ -0,0 +1,49 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_DEBUG_H
#define UNIX_DEBUG_H
#include <Drivers.h>
#if UNIX_DEBUG_LEVEL
# define TRACE(args...) dprintf(args)
# define PRINT_ERROR(error) \
do { \
dprintf("[%ld] l. %d: %s: %s\n", find_thread(NULL), \
__LINE__, __PRETTY_FUNCTION__, strerror(error)); \
} while (false)
# if UNIX_DEBUG_LEVEL >= 2
# define REPORT_ERROR(error) PRINT_ERROR(error)
# define RETURN_ERROR(error) \
do { \
__typeof(error) error_RETURN_ERROR = (error); \
PRINT_ERROR(error_RETURN_ERROR); \
return error_RETURN_ERROR; \
} while (false)
# else
# define REPORT_ERROR(error) \
do { \
__typeof(error) error_REPORT_ERROR = (error); \
if (error_REPORT_ERROR < 0) \
PRINT_ERROR(error_REPORT_ERROR); \
} while (false)
# define RETURN_ERROR(error) \
do { \
__typeof(error) error_RETURN_ERROR = (error); \
if (error_RETURN_ERROR < 0) \
PRINT_ERROR(error_RETURN_ERROR); \
return error_RETURN_ERROR; \
} while (false)
# endif
#else
# define TRACE(args...) do {} while (false)
# define REPORT_ERROR(error)
# define RETURN_ERROR(error)
#endif
#endif // UNIX_DEBUG_H

View File

@ -0,0 +1,783 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixEndpoint.h"
#include <stdio.h>
#include <sys/stat.h>
#include <AutoDeleter.h>
#include <vfs.h>
#include "UnixAddressManager.h"
#include "UnixFifo.h"
#define UNIX_ENDPOINT_DEBUG_LEVEL 2
#define UNIX_DEBUG_LEVEL UNIX_ENDPOINT_DEBUG_LEVEL
#include "UnixDebug.h"
// Note on locking order (outermost -> innermost):
// UnixEndpoint: connecting -> listening -> child
// -> UnixFifo (never lock more than one at a time)
// -> UnixAddressManager
static inline bigtime_t
absolute_timeout(bigtime_t timeout)
{
if (timeout == 0 || timeout == B_INFINITE_TIMEOUT)
return timeout;
// TODO: Make overflow safe!
return timeout + system_time();
}
UnixEndpoint::UnixEndpoint(net_socket* socket)
:
ProtocolSocket(socket),
fAddress(),
fAddressHashLink(),
fPeerEndpoint(NULL),
fReceiveFifo(NULL),
fState(UNIX_ENDPOINT_CLOSED),
fAcceptSemaphore(-1),
fIsChild(false)
{
TRACE("[%ld] %p->UnixEndpoint::UnixEndpoint()\n", find_thread(NULL), this);
fLock.sem = -1;
}
UnixEndpoint::~UnixEndpoint()
{
TRACE("[%ld] %p->UnixEndpoint::~UnixEndpoint()\n", find_thread(NULL), this);
if (fLock.sem >= 0)
benaphore_destroy(&fLock);
}
status_t
UnixEndpoint::Init()
{
TRACE("[%ld] %p->UnixEndpoint::Init()\n", find_thread(NULL), this);
status_t error = benaphore_init(&fLock, "unix endpoint");
if (error != B_OK)
RETURN_ERROR(ENOBUFS);
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::Uninit()
{
TRACE("[%ld] %p->UnixEndpoint::Uninit()\n", find_thread(NULL), this);
// check whether we're closed
UnixEndpointLocker locker(this);
bool closed = (fState == UNIX_ENDPOINT_CLOSED);
locker.Unlock();
if (!closed) {
// That probably means, we're a child endpoint of a listener and
// have been fully connected, but not yet accepted. Our Close()
// hook isn't called in this case. Do it manually.
Close();
}
RemoveReference();
}
status_t
UnixEndpoint::Open()
{
TRACE("[%ld] %p->UnixEndpoint::Open()\n", find_thread(NULL), this);
status_t error = ProtocolSocket::Open();
if (error != B_OK)
RETURN_ERROR(error);
fState = UNIX_ENDPOINT_NOT_CONNECTED;
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Close()
{
TRACE("[%ld] %p->UnixEndpoint::Close()\n", find_thread(NULL), this);
UnixEndpointLocker locker(this);
if (fState == UNIX_ENDPOINT_CONNECTED) {
UnixEndpointLocker peerLocker;
if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
// We're still connected. Disconnect both endpoints!
fPeerEndpoint->_Disconnect();
_Disconnect();
}
}
if (fState == UNIX_ENDPOINT_LISTENING)
_StopListening();
_Unbind();
fState = UNIX_ENDPOINT_CLOSED;
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Free()
{
TRACE("[%ld] %p->UnixEndpoint::Free()\n", find_thread(NULL), this);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Bind(const struct sockaddr *_address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%ld] %p->UnixEndpoint::Bind(\"%s\")\n", find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixEndpointLocker endpointLocker(this);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED || IsBound())
RETURN_ERROR(B_BAD_VALUE);
if (address->sun_path[0] == '\0') {
UnixAddressManagerLocker addressLocker(gAddressManager);
// internal address space (or empty address)
int32 internalID;
if (UnixAddress::IsEmptyAddress(*address))
internalID = gAddressManager.NextUnusedInternalID();
else
internalID = UnixAddress::InternalID(*address);
if (internalID < 0)
RETURN_ERROR(internalID);
status_t error = _Bind(internalID);
if (error != B_OK)
RETURN_ERROR(error);
sockaddr_un* outAddress = (sockaddr_un*)&socket->address;
outAddress->sun_path[0] = '\0';
sprintf(outAddress->sun_path + 1, "%05lx", internalID);
outAddress->sun_len = INTERNAL_UNIX_ADDRESS_LEN;
// null-byte + 5 hex digits
gAddressManager.Add(this);
} else {
// FS address space
size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
if (pathLen == 0 || pathLen == sizeof(address->sun_path))
RETURN_ERROR(B_BAD_VALUE);
bool kernel = false;
// TODO: We don't have the info at this point!
struct vnode* vnode;
status_t error = vfs_create_special_node(address->sun_path,
NULL, S_IFSOCK | 0644, 0, kernel, NULL, &vnode);
if (error != B_OK)
RETURN_ERROR(error == B_FILE_EXISTS ? EADDRINUSE : error);
error = _Bind(vnode);
if (error != B_OK) {
vfs_put_vnode(vnode);
RETURN_ERROR(error);
}
size_t addressLen = address->sun_path + pathLen + 1 - (char*)address;
memcpy(&socket->address, address, addressLen);
socket->address.ss_len = addressLen;
UnixAddressManagerLocker addressLocker(gAddressManager);
gAddressManager.Add(this);
}
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Unbind()
{
TRACE("[%ld] %p->UnixEndpoint::Unbind()\n", find_thread(NULL), this);
UnixEndpointLocker endpointLocker(this);
RETURN_ERROR(_Unbind());
}
status_t
UnixEndpoint::Listen(int backlog)
{
TRACE("[%ld] %p->UnixEndpoint::Listen(%d)\n", find_thread(NULL), this,
backlog);
UnixEndpointLocker endpointLocker(this);
if (!IsBound())
RETURN_ERROR(EDESTADDRREQ);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
RETURN_ERROR(EINVAL);
gSocketModule->set_max_backlog(socket, backlog);
fAcceptSemaphore = create_sem(0, "unix accept");
if (fAcceptSemaphore < 0)
RETURN_ERROR(ENOBUFS);
fState = UNIX_ENDPOINT_LISTENING;
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Connect(const struct sockaddr *_address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%ld] %p->UnixEndpoint::Connect(\"%s\")\n", find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixEndpointLocker endpointLocker(this);
if (fState == UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(EISCONN);
if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
RETURN_ERROR(B_BAD_VALUE);
// TODO: If listening, we could set the backlog to 0 and connect.
// check the address first
UnixAddress unixAddress;
if (address->sun_path[0] == '\0') {
// internal address space (or empty address)
int32 internalID;
if (UnixAddress::IsEmptyAddress(*address))
RETURN_ERROR(B_BAD_VALUE);
internalID = UnixAddress::InternalID(*address);
if (internalID < 0)
RETURN_ERROR(internalID);
unixAddress.SetTo(internalID);
} else {
// FS address space
size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
if (pathLen == 0 || pathLen == sizeof(address->sun_path))
RETURN_ERROR(B_BAD_VALUE);
bool kernel = false;
// TODO: We don't have the info at this point!
struct stat st;
status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
kernel);
if (error != B_OK)
RETURN_ERROR(error);
if (!S_ISSOCK(st.st_mode))
RETURN_ERROR(B_BAD_VALUE);
unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
}
// get the peer endpoint
UnixAddressManagerLocker addressLocker(gAddressManager);
UnixEndpoint* listeningEndpoint = gAddressManager.Lookup(unixAddress);
if (listeningEndpoint == NULL)
RETURN_ERROR(ECONNREFUSED);
Reference<UnixEndpoint> peerReference(listeningEndpoint);
addressLocker.Unlock();
UnixEndpointLocker peerLocker(listeningEndpoint);
if (!listeningEndpoint->IsBound()
|| listeningEndpoint->fState != UNIX_ENDPOINT_LISTENING
|| listeningEndpoint->fAddress != unixAddress) {
RETURN_ERROR(ECONNREFUSED);
}
// Allocate FIFOs for us and the socket we're going to spawn. We do that
// now, so that the mess we need to cleanup, if allocating them fails, is
// harmless.
UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
ObjectDeleter<UnixFifo> fifoDeleter(fifo);
ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
status_t error;
if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
return error;
// spawn new endpoint for accept()
net_socket* newSocket;
error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
&newSocket);
if (error != B_OK)
RETURN_ERROR(error);
// init connected peer endpoint
UnixEndpoint* connectedEndpoint = (UnixEndpoint*)newSocket->first_protocol;
UnixEndpointLocker connectedLocker(connectedEndpoint);
connectedEndpoint->_Spawn(this, peerFifo);
fPeerEndpoint = connectedEndpoint;
PeerAddress().SetTo(&connectedEndpoint->socket->address);
fPeerEndpoint->AddReference();
fReceiveFifo = fifo;
fifoDeleter.Detach();
peerFifoDeleter.Detach();
fState = UNIX_ENDPOINT_CONNECTED;
gSocketModule->set_connected(newSocket);
release_sem(listeningEndpoint->fAcceptSemaphore);
connectedLocker.Unlock();
peerLocker.Unlock();
endpointLocker.Unlock();
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::Accept(net_socket **_acceptedSocket)
{
TRACE("[%ld] %p->UnixEndpoint::Accept()\n", find_thread(NULL), this);
bigtime_t timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
status_t error;
do {
locker.Unlock();
error = acquire_sem_etc(fAcceptSemaphore, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
if (error < B_OK)
RETURN_ERROR(error);
locker.Lock();
error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
} while (error != B_OK);
if (error == B_TIMED_OUT && timeout == 0) {
// translate non-blocking timeouts to the correct error code
error = B_WOULD_BLOCK;
}
RETURN_ERROR(error);
}
status_t
UnixEndpoint::Send(net_buffer *buffer)
{
TRACE("[%ld] %p->UnixEndpoint::Send(%p)\n", find_thread(NULL), this,
buffer);
bigtime_t timeout = absolute_timeout(socket->send.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
Reference<UnixEndpoint> peerReference(peerEndpoint);
// lock the peer's FIFO
UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
Reference<UnixFifo> _(peerFifo);
UnixFifoLocker fifoLocker(peerFifo);
// unlock endpoints
locker.Unlock();
peerLocker.Unlock();
error = peerFifo->Write(buffer, timeout);
// Notify select()ing readers, if we successfully wrote anything.
size_t readable = peerFifo->Readable();
bool notifyRead = (error == B_OK && readable > 0
&& !peerFifo->IsReadShutdown());
// Notify select()ing writers, if we failed to write anything and there's
// still room to write.
size_t writable = peerFifo->Writable();
bool notifyWrite = (error != B_OK && writable > 0
&& !peerFifo->IsWriteShutdown());
// re-lock our endpoint (unlock FIFO to respect locking order)
fifoLocker.Unlock();
locker.Lock();
// send notifications
if (notifyRead)
gSocketModule->notify(socket, B_SELECT_READ, readable);
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
if (error == UNIX_FIFO_SHUTDOWN) {
// This might either mean, that someone called shutdown() or close(),
// or our peer closed the connection.
if (fPeerEndpoint == peerEndpoint) {
// Orderly shutdown.
error = EPIPE;
} else if (fState == UNIX_ENDPOINT_CLOSED) {
// The FD has been closed.
error = EBADF;
} else {
// Peer closed the connection.
error = EPIPE;
send_signal(find_thread(NULL), SIGPIPE);
}
} else if (error == B_TIMED_OUT && timeout == 0) {
// translate non-blocking timeouts to the correct error code
error = B_WOULD_BLOCK;
}
RETURN_ERROR(error);
}
status_t
UnixEndpoint::Receive(size_t numBytes, uint32 flags, net_buffer **_buffer)
{
TRACE("[%ld] %p->UnixEndpoint::Receive(%ld, 0x%lx)\n", find_thread(NULL),
this, numBytes, flags);
bigtime_t timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixEndpointLocker locker(this);
if (fState != UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(ENOTCONN);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
Reference<UnixEndpoint> peerReference(peerEndpoint);
// lock our FIFO
UnixFifo* fifo = fReceiveFifo;
Reference<UnixFifo> _(fifo);
UnixFifoLocker fifoLocker(fifo);
// unlock endpoint
locker.Unlock();
status_t error = fifo->Read(numBytes, timeout, _buffer);
// Notify select()ing writers, if we successfully read anything.
size_t writable = fifo->Writable();
bool notifyWrite = (error == B_OK && writable > 0
&& !fifo->IsWriteShutdown());
// Notify select()ing readers, if we failed to read anything and there's
// still something left to read.
size_t readable = fifo->Readable();
bool notifyRead = (error != B_OK && readable > 0
&& !fifo->IsReadShutdown());
// re-lock our endpoint (unlock FIFO to respect locking order)
fifoLocker.Unlock();
locker.Lock();
// send notifications
if (notifyRead)
gSocketModule->notify(socket, B_SELECT_READ, readable);
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
if (error == UNIX_FIFO_SHUTDOWN) {
// This might either mean, that someone called shutdown() or close(),
// or our peer closed the connection.
if (fPeerEndpoint == peerEndpoint) {
// Orderly shutdown. Return B_OK, but a size of 0.
error = B_OK;
*_buffer = NULL;
} else if (fState == UNIX_ENDPOINT_CLOSED) {
// The FD has been closed.
error = EBADF;
} else {
// The connection has been closed by our peer.
error = ECONNRESET;
}
} else if (error == B_TIMED_OUT && timeout == 0) {
// translate non-blocking timeouts to the correct error code
error = B_WOULD_BLOCK;
}
RETURN_ERROR(error);
}
ssize_t
UnixEndpoint::Sendable()
{
TRACE("[%ld] %p->UnixEndpoint::Sendable()\n", find_thread(NULL), this);
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
// lock the peer's FIFO
UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
UnixFifoLocker fifoLocker(peerFifo);
RETURN_ERROR(peerFifo->Writable());
}
ssize_t
UnixEndpoint::Receivable()
{
TRACE("[%ld] %p->UnixEndpoint::Receivable()\n", find_thread(NULL), this);
UnixEndpointLocker locker(this);
if (fState != UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(ENOTCONN);
UnixFifoLocker fifoLocker(fReceiveFifo);
RETURN_ERROR(fReceiveFifo->Readable());
}
void
UnixEndpoint::SetReceiveBufferSize(size_t size)
{
TRACE("[%ld] %p->UnixEndpoint::SetReceiveBufferSize(%lu)\n",
find_thread(NULL), this, size);
UnixEndpointLocker locker(this);
if (fState != UNIX_ENDPOINT_CONNECTED)
return;
UnixFifoLocker fifoLocker(fReceiveFifo);
fReceiveFifo->SetBufferCapacity(size);
}
status_t
UnixEndpoint::Shutdown(int direction)
{
TRACE("[%ld] %p->UnixEndpoint::SetReceiveBufferSize(%d)\n",
find_thread(NULL), this, direction);
uint32 shutdown;
uint32 peerShutdown;
// translate the direction into shutdown flags
switch (direction) {
case SHUT_RD:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
case SHUT_WR:
shutdown = UNIX_FIFO_SHUTDOWN_WRITE;
peerShutdown = UNIX_FIFO_SHUTDOWN_READ;
break;
case SHUT_RDWR:
shutdown = peerShutdown = UNIX_FIFO_SHUTDOWN_READ
| UNIX_FIFO_SHUTDOWN_WRITE;
break;
default:
RETURN_ERROR(B_BAD_VALUE);
}
// lock endpoints
UnixEndpointLocker locker(this);
UnixEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
// shutdown our FIFO
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(shutdown);
fReceiveFifo->Unlock();
// shutdown peer FIFO
fPeerEndpoint->fReceiveFifo->Lock();
fPeerEndpoint->fReceiveFifo->Shutdown(shutdown);
fPeerEndpoint->fReceiveFifo->Unlock();
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::_Spawn(UnixEndpoint* connectingEndpoint, UnixFifo* fifo)
{
ProtocolSocket::Open();
fIsChild = true;
fPeerEndpoint = connectingEndpoint;
fPeerEndpoint->AddReference();
fReceiveFifo = fifo;
PeerAddress().SetTo(&connectingEndpoint->socket->address);
fState = UNIX_ENDPOINT_CONNECTED;
}
void
UnixEndpoint::_Disconnect()
{
// Both endpoints must be locked.
// Shutdown and unset the receive FIFO.
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_READ | UNIX_FIFO_SHUTDOWN_WRITE);
fReceiveFifo->Unlock();
fReceiveFifo->RemoveReference();
fReceiveFifo = NULL;
// select() notification.
gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
gSocketModule->notify(socket, B_SELECT_ERROR, ECONNRESET);
// Unset the peer endpoint.
fPeerEndpoint->RemoveReference();
fPeerEndpoint = NULL;
// We're officially disconnected.
// TODO: Deal with non accept()ed connections correctly!
fIsChild = false;
fState = UNIX_ENDPOINT_NOT_CONNECTED;
}
status_t
UnixEndpoint::_LockConnectedEndpoints(UnixEndpointLocker& locker,
UnixEndpointLocker& peerLocker)
{
if (fState != UNIX_ENDPOINT_CONNECTED)
RETURN_ERROR(ENOTCONN);
// We need to lock the peer, too. Get a reference -- we might need to
// unlock ourselves to get the locking order right.
Reference<UnixEndpoint> peerReference(fPeerEndpoint);
UnixEndpoint* peerEndpoint = fPeerEndpoint;
if (fIsChild) {
// We're the child, but locking order is the other way around.
locker.Unlock();
peerLocker.SetTo(peerEndpoint, false);
locker.Lock();
// recheck our state, also whether the peer is still the same
if (fState != UNIX_ENDPOINT_CONNECTED || peerEndpoint != fPeerEndpoint)
RETURN_ERROR(ENOTCONN);
} else
peerLocker.SetTo(peerEndpoint, false);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Bind(struct vnode* vnode)
{
struct stat st;
status_t error = vfs_stat_vnode(vnode, &st);
if (error != B_OK)
RETURN_ERROR(error);
fAddress.SetTo(st.st_dev, st.st_ino, vnode);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Bind(int32 internalID)
{
fAddress.SetTo(internalID);
RETURN_ERROR(B_OK);
}
status_t
UnixEndpoint::_Unbind()
{
if (fState == UNIX_ENDPOINT_CONNECTED || fState == UNIX_ENDPOINT_LISTENING)
RETURN_ERROR(B_BAD_VALUE);
if (IsBound()) {
UnixAddressManagerLocker addressLocker(gAddressManager);
gAddressManager.Remove(this);
if (struct vnode* vnode = fAddress.Vnode())
vfs_put_vnode(vnode);
fAddress.Unset();
}
RETURN_ERROR(B_OK);
}
void
UnixEndpoint::_StopListening()
{
if (fState == UNIX_ENDPOINT_LISTENING) {
delete_sem(fAcceptSemaphore);
fAcceptSemaphore = -1;
fState = UNIX_ENDPOINT_NOT_CONNECTED;
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_ENDPOINT_H
#define UNIX_ENDPOINT_H
#include <sys/stat.h>
#include <Referenceable.h>
#include <lock.h>
#include <util/DoublyLinkedList.h>
#include <util/OpenHashTable.h>
#include <vfs.h>
#include <net_protocol.h>
#include <net_socket.h>
#include <ProtocolUtilities.h>
#include "unix.h"
#include "UnixAddress.h"
class UnixEndpoint;
class UnixFifo;
enum unix_endpoint_state {
UNIX_ENDPOINT_NOT_CONNECTED,
UNIX_ENDPOINT_LISTENING,
UNIX_ENDPOINT_CONNECTED,
UNIX_ENDPOINT_CLOSED
};
typedef AutoLocker<UnixEndpoint> UnixEndpointLocker;
class UnixEndpoint : public net_protocol, public ProtocolSocket,
public Referenceable {
public:
UnixEndpoint(net_socket* socket);
virtual ~UnixEndpoint();
status_t Init();
void Uninit();
status_t Open();
status_t Close();
status_t Free();
bool Lock()
{
return benaphore_lock(&fLock) == B_OK;
}
void Unlock()
{
benaphore_unlock(&fLock);
}
status_t Bind(const struct sockaddr *_address);
status_t Unbind();
status_t Listen(int backlog);
status_t Connect(const struct sockaddr *address);
status_t Accept(net_socket **_acceptedSocket);
status_t Send(net_buffer *buffer);
status_t Receive(size_t numBytes, uint32 flags, net_buffer **_buffer);
ssize_t Sendable();
ssize_t Receivable();
void SetReceiveBufferSize(size_t size);
status_t Shutdown(int direction);
bool IsBound() const
{
return !fIsChild && fAddress.IsValid();
}
const UnixAddress& Address() const
{
return fAddress;
}
HashTableLink<UnixEndpoint>* HashTableLink()
{
return &fAddressHashLink;
}
private:
void _Spawn(UnixEndpoint* connectingEndpoint, UnixFifo* fifo);
void _Disconnect();
status_t _LockConnectedEndpoints(UnixEndpointLocker& locker,
UnixEndpointLocker& peerLocker);
status_t _Bind(struct vnode* vnode);
status_t _Bind(int32 internalID);
status_t _Unbind();
void _StopListening();
private:
benaphore fLock;
UnixAddress fAddress;
::HashTableLink<UnixEndpoint> fAddressHashLink;
UnixEndpoint* fPeerEndpoint;
UnixFifo* fReceiveFifo;
unix_endpoint_state fState;
sem_id fAcceptSemaphore;
bool fIsChild;
};
#endif // UNIX_ENDPOINT_H

View File

@ -0,0 +1,369 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixFifo.h"
#include "unix.h"
UnixBufferQueue::UnixBufferQueue(size_t capacity)
:
fSize(0),
fCapacity(capacity)
{
}
UnixBufferQueue::~UnixBufferQueue()
{
while (net_buffer* buffer = fBuffers.RemoveHead())
gBufferModule->free(buffer);
}
status_t
UnixBufferQueue::Read(size_t size, net_buffer** _buffer)
{
if (size > fSize)
size = fSize;
if (size == 0)
return B_BAD_VALUE;
// If the first buffer has the right size or is smaller, we can just
// dequeue it.
net_buffer* buffer = fBuffers.Head();
if (buffer->size <= size) {
fBuffers.RemoveHead();
fSize -= buffer->size;
*_buffer = buffer;
if (buffer->size == size)
return B_OK;
// buffer is too small
size_t bytesLeft = size - buffer->size;
// Append from the following buffers, until we've read as much as we're
// supposed to.
while (bytesLeft > 0) {
net_buffer* nextBuffer = fBuffers.Head();
size_t toCopy = min_c(bytesLeft, nextBuffer->size);
if (gBufferModule->append_cloned(buffer, nextBuffer, 0, toCopy)
!= B_OK) {
// Too bad, but we've got some data, so we don't fail.
return B_OK;
}
if (nextBuffer->size > toCopy) {
// remove the part we've copied
gBufferModule->remove_header(nextBuffer, toCopy);
} else {
// get rid of the buffer completely
fBuffers.RemoveHead();
gBufferModule->free(nextBuffer);
}
bytesLeft -= toCopy;
}
return B_OK;
}
// buffer is too big
// Create a new buffer, and copy into it, as much as we need.
net_buffer* newBuffer = gBufferModule->create(256);
if (newBuffer == NULL)
return ENOBUFS;
status_t error = gBufferModule->append_cloned(newBuffer, buffer, 0, size);
if (error != B_OK) {
gBufferModule->free(newBuffer);
return error;
}
// remove the part we've copied
gBufferModule->remove_header(buffer, size);
return B_OK;
}
status_t
UnixBufferQueue::Write(net_buffer* buffer)
{
if (buffer->size > Writable())
return ENOBUFS;
fBuffers.Add(buffer);
fSize += buffer->size;
return B_OK;
}
void
UnixBufferQueue::SetCapacity(size_t capacity)
{
fCapacity = capacity;
}
// #pragma mark -
UnixFifo::UnixFifo(size_t capacity)
:
fBuffer(capacity),
fReaders(),
fWriters(),
fReadRequested(0),
fWriteRequested(0),
fReaderSem(-1),
fWriterSem(-1),
fShutdown(0)
{
fLock.sem = -1;
}
UnixFifo::~UnixFifo()
{
if (fReaderSem >= 0)
delete_sem(fReaderSem);
if (fWriterSem >= 0)
delete_sem(fWriterSem);
if (fLock.sem >= 0)
benaphore_destroy(&fLock);
}
status_t
UnixFifo::Init()
{
status_t error = benaphore_init(&fLock, "unix fifo");
fReaderSem = create_sem(0, "unix fifo readers");
fWriterSem = create_sem(0, "unix fifo writers");
if (error != B_OK || fReaderSem < 0 || fWriterSem < 0)
return ENOBUFS;
return B_OK;
}
void
UnixFifo::Shutdown(uint32 shutdown)
{
fShutdown |= shutdown;
if ((shutdown & UNIX_FIFO_SHUTDOWN_READ) != 0)
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
if ((shutdown & UNIX_FIFO_SHUTDOWN_WRITE) != 0)
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
}
status_t
UnixFifo::Read(size_t numBytes, bigtime_t timeout, net_buffer** _buffer)
{
if (IsReadShutdown())
return UNIX_FIFO_SHUTDOWN;
Request request(numBytes);
fReaders.Add(&request);
fReadRequested += request.size;
status_t error = _Read(request, numBytes, timeout, _buffer);
bool firstInQueue = fReaders.Head() == &request;
fReaders.Remove(&request);
fReadRequested -= request.size;
if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
&& !IsReadShutdown()) {
// There's more to read, other readers, and we were first in the queue.
// So we need to notify the others.
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
}
if (error == B_OK && *_buffer != NULL && (*_buffer)->size > 0
&& !fWriters.IsEmpty() && !IsWriteShutdown()) {
// We read something and there are writers. Notify them
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
}
return error;
}
status_t
UnixFifo::Write(net_buffer* buffer, bigtime_t timeout)
{
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
Request request(buffer->size);
fWriters.Add(&request);
fWriteRequested += request.size;
status_t error = _Write(request, buffer, timeout);
bool firstInQueue = fWriters.Head() == &request;
fWriters.Remove(&request);
fWriteRequested -= request.size;
if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
&& !IsWriteShutdown()) {
// There's more space for writing, other writers, and we were first in
// the queue. So we need to notify the others.
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
}
if (error == B_OK && request.size > 0 && !fReaders.IsEmpty()
&& !IsReadShutdown()) {
// We've written something and there are readers. Notify them
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
}
return error;
}
size_t
UnixFifo::Readable() const
{
size_t readable = fBuffer.Readable();
return readable > fReadRequested ? readable - fReadRequested : 0;
}
size_t
UnixFifo::Writable() const
{
size_t writable = fBuffer.Writable();
return writable > fWriteRequested ? writable - fWriteRequested : 0;
}
void
UnixFifo::SetBufferCapacity(size_t capacity)
{
// check against allowed minimal/maximal value
if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
capacity = UNIX_FIFO_MINIMAL_CAPACITY;
size_t oldCapacity = fBuffer.Capacity();
if (capacity == oldCapacity)
return;
// set capacity
fBuffer.SetCapacity(capacity);
// wake up waiting writers, if the capacity increased
if (!fWriters.IsEmpty() && !IsWriteShutdown())
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
}
status_t
UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
net_buffer** _buffer)
{
// wait for the request to reach the front of the queue
if (fReaders.Head() != &request && timeout == 0)
return B_WOULD_BLOCK;
while (fReaders.Head() != &request && !IsReadShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fReaderSem, 1, timeout,
B_ABSOLUTE_TIMEOUT);
benaphore_lock(&fLock);
if (error != B_OK)
return error;
}
if (IsReadShutdown())
return UNIX_FIFO_SHUTDOWN;
// wait for any data to become available
if (fBuffer.Readable() == 0 && timeout == 0)
return B_WOULD_BLOCK;
while (fBuffer.Readable() == 0 && !IsReadShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fReaderSem, 1, timeout,
B_ABSOLUTE_TIMEOUT);
benaphore_lock(&fLock);
if (error != B_OK)
return error;
}
if (IsReadShutdown())
return UNIX_FIFO_SHUTDOWN;
return fBuffer.Read(numBytes, _buffer);
}
status_t
UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
{
// wait for the request to reach the front of the queue
if (fWriters.Head() != &request && timeout == 0)
return B_WOULD_BLOCK;
while (fWriters.Head() != &request && !IsWriteShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fWriterSem, 1, timeout,
B_ABSOLUTE_TIMEOUT);
benaphore_lock(&fLock);
if (error != B_OK)
return error;
}
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
// wait for any space to become available
if (fBuffer.Writable() < request.size && timeout == 0)
return B_WOULD_BLOCK;
while (fBuffer.Writable() < request.size && !IsWriteShutdown()) {
benaphore_unlock(&fLock);
status_t error = acquire_sem_etc(fWriterSem, 1, timeout,
B_ABSOLUTE_TIMEOUT);
benaphore_lock(&fLock);
if (error != B_OK)
return error;
}
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
return fBuffer.Write(buffer);
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_FIFO_H
#define UNIX_FIFO_H
#include <Referenceable.h>
#include <lock.h>
#include <util/AutoLock.h>
#include <util/DoublyLinkedList.h>
#include <net_buffer.h>
#define UNIX_FIFO_SHUTDOWN_READ 1
#define UNIX_FIFO_SHUTDOWN_WRITE 2
#define UNIX_FIFO_SHUTDOWN 1
// error code returned by Read()/Write()
#define UNIX_FIFO_MINIMAL_CAPACITY 1024
#define UNIX_FIFO_MAXIMAL_CAPACITY (128 * 1024)
class UnixBufferQueue {
public:
UnixBufferQueue(size_t capacity);
~UnixBufferQueue();
size_t Readable() const { return fSize; }
size_t Writable() const
{ return fCapacity >= fSize ? fCapacity - fSize : 0; }
status_t Read(size_t size, net_buffer** _buffer);
status_t Write(net_buffer* buffer);
size_t Capacity() const { return fCapacity; }
void SetCapacity(size_t capacity);
private:
typedef DoublyLinkedList<net_buffer, DoublyLinkedListCLink<net_buffer> >
BufferList;
BufferList fBuffers;
size_t fSize;
size_t fCapacity;
};
class UnixFifo : public Referenceable {
public:
UnixFifo(size_t capacity);
~UnixFifo();
status_t Init();
bool Lock()
{
return benaphore_lock(&fLock) == B_OK;
}
void Unlock()
{
benaphore_unlock(&fLock);
}
void Shutdown(uint32 shutdown);
bool IsReadShutdown() const
{
return (fShutdown & UNIX_FIFO_SHUTDOWN_READ);
}
bool IsWriteShutdown() const
{
return (fShutdown & UNIX_FIFO_SHUTDOWN_WRITE);
}
status_t Read(size_t numBytes, bigtime_t timeout, net_buffer** _buffer);
status_t Write(net_buffer* buffer, bigtime_t timeout);
size_t Readable() const;
size_t Writable() const;
void SetBufferCapacity(size_t capacity);
private:
struct Request : DoublyLinkedListLinkImpl<Request> {
Request(size_t size)
:
size(size)
{
}
size_t size;
};
typedef DoublyLinkedList<Request> RequestList;
private:
status_t _Read(Request& request, size_t numBytes, bigtime_t timeout,
net_buffer** _buffer);
status_t _Write(Request& request, net_buffer* buffer, bigtime_t timeout);
private:
benaphore fLock;
UnixBufferQueue fBuffer;
RequestList fReaders;
RequestList fWriters;
size_t fReadRequested;
size_t fWriteRequested;
sem_id fReaderSem;
sem_id fWriterSem;
uint32 fShutdown;
};
typedef AutoLocker<UnixFifo> UnixFifoLocker;
#endif // UNIX_FIFO_H

View File

@ -0,0 +1,351 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include <stdio.h>
#include <sys/un.h>
#include <new>
#include <lock.h>
#include <util/AutoLock.h>
#include <vfs.h>
#include <net_buffer.h>
#include <net_protocol.h>
#include <net_socket.h>
#include <net_stack.h>
#include "UnixAddressManager.h"
#include "UnixEndpoint.h"
#define UNIX_MODULE_DEBUG_LEVEL 2
#define UNIX_DEBUG_LEVEL UNIX_MODULE_DEBUG_LEVEL
#include "UnixDebug.h"
extern net_protocol_module_info gUnixModule;
// extern only for forwarding
net_stack_module_info *gStackModule;
net_socket_module_info *gSocketModule;
net_buffer_module_info *gBufferModule;
UnixAddressManager gAddressManager;
static struct net_domain *sDomain;
net_protocol *
unix_init_protocol(net_socket *socket)
{
TRACE("[%ld] unix_init_protocol(%p)\n", find_thread(NULL), socket);
UnixEndpoint* endpoint = new(std::nothrow) UnixEndpoint(socket);
if (endpoint == NULL)
return NULL;
status_t error = endpoint->Init();
if (error != B_OK) {
delete endpoint;
return NULL;
}
return endpoint;
}
status_t
unix_uninit_protocol(net_protocol *_protocol)
{
TRACE("[%ld] unix_uninit_protocol(%p)\n", find_thread(NULL), _protocol);
((UnixEndpoint*)_protocol)->Uninit();
return B_OK;
}
status_t
unix_open(net_protocol *_protocol)
{
return ((UnixEndpoint*)_protocol)->Open();
}
status_t
unix_close(net_protocol *_protocol)
{
return ((UnixEndpoint*)_protocol)->Close();
}
status_t
unix_free(net_protocol *_protocol)
{
return ((UnixEndpoint*)_protocol)->Free();
}
status_t
unix_connect(net_protocol *_protocol, const struct sockaddr *address)
{
return ((UnixEndpoint*)_protocol)->Connect(address);
}
status_t
unix_accept(net_protocol *_protocol, struct net_socket **_acceptedSocket)
{
return ((UnixEndpoint*)_protocol)->Accept(_acceptedSocket);
}
status_t
unix_control(net_protocol *protocol, int level, int option, void *value,
size_t *_length)
{
return B_BAD_VALUE;
}
status_t
unix_getsockopt(net_protocol *protocol, int level, int option, void *value,
int *_length)
{
return gSocketModule->get_option(protocol->socket, level, option, value,
_length);
}
status_t
unix_setsockopt(net_protocol *protocol, int level, int option,
const void *_value, int length)
{
UnixEndpoint* endpoint = (UnixEndpoint*)protocol;
if (level == SOL_SOCKET) {
if (option == SO_RCVBUF) {
if (length != sizeof(int))
return B_BAD_VALUE;
endpoint->SetReceiveBufferSize(*(int*)_value);
} else if (option == SO_SNDBUF) {
// We don't have a receive buffer, so silently ignore this one.
}
}
return gSocketModule->set_option(protocol->socket, level, option,
_value, length);
}
status_t
unix_bind(net_protocol *_protocol, const struct sockaddr *_address)
{
return ((UnixEndpoint*)_protocol)->Bind(_address);
}
status_t
unix_unbind(net_protocol *_protocol, struct sockaddr *_address)
{
return ((UnixEndpoint*)_protocol)->Unbind();
}
status_t
unix_listen(net_protocol *_protocol, int count)
{
return ((UnixEndpoint*)_protocol)->Listen(count);
}
status_t
unix_shutdown(net_protocol *_protocol, int direction)
{
return ((UnixEndpoint*)_protocol)->Shutdown(direction);
}
status_t
unix_send_routed_data(net_protocol *_protocol, struct net_route *route,
net_buffer *buffer)
{
return B_ERROR;
}
status_t
unix_send_data(net_protocol *_protocol, net_buffer *buffer)
{
return ((UnixEndpoint*)_protocol)->Send(buffer);
}
ssize_t
unix_send_avail(net_protocol *_protocol)
{
return ((UnixEndpoint*)_protocol)->Sendable();
}
status_t
unix_read_data(net_protocol *_protocol, size_t numBytes, uint32 flags,
net_buffer **_buffer)
{
return ((UnixEndpoint*)_protocol)->Receive(numBytes, flags, _buffer);
}
ssize_t
unix_read_avail(net_protocol *_protocol)
{
return ((UnixEndpoint*)_protocol)->Receivable();
}
struct net_domain *
unix_get_domain(net_protocol *protocol)
{
return sDomain;
}
size_t
unix_get_mtu(net_protocol *protocol, const struct sockaddr *address)
{
return UNIX_MAX_TRANSFER_UNIT;
}
status_t
unix_receive_data(net_buffer *buffer)
{
return B_ERROR;
}
status_t
unix_deliver_data(net_protocol *_protocol, net_buffer *buffer)
{
return B_ERROR;
}
status_t
unix_error(uint32 code, net_buffer *data)
{
return B_ERROR;
}
status_t
unix_error_reply(net_protocol *protocol, net_buffer *causedError, uint32 code,
void *errorData)
{
return B_ERROR;
}
// #pragma mark -
status_t
init_unix()
{
new(&gAddressManager) UnixAddressManager;
status_t error = gAddressManager.Init();
if (error != B_OK)
return error;
error = gStackModule->register_domain_protocols(AF_UNIX, SOCK_STREAM, 0,
"network/protocols/unix/v1", NULL);
if (error != B_OK) {
gAddressManager.~UnixAddressManager();
return error;
}
error = gStackModule->register_domain(AF_UNIX, "unix", &gUnixModule,
&gAddressModule, &sDomain);
if (error != B_OK) {
gAddressManager.~UnixAddressManager();
return error;
}
return B_OK;
}
status_t
uninit_unix()
{
gStackModule->unregister_domain(sDomain);
gAddressManager.~UnixAddressManager();
return B_OK;
}
static status_t
unix_std_ops(int32 op, ...)
{
switch (op) {
case B_MODULE_INIT:
return init_unix();
case B_MODULE_UNINIT:
return uninit_unix();
default:
return B_ERROR;
}
}
net_protocol_module_info gUnixModule = {
{
"network/protocols/unix/v1",
0,
unix_std_ops
},
0, // NET_PROTOCOL_ATOMIC_MESSAGES,
unix_init_protocol,
unix_uninit_protocol,
unix_open,
unix_close,
unix_free,
unix_connect,
unix_accept,
unix_control,
unix_getsockopt,
unix_setsockopt,
unix_bind,
unix_unbind,
unix_listen,
unix_shutdown,
unix_send_data,
unix_send_routed_data,
unix_send_avail,
unix_read_data,
unix_read_avail,
unix_get_domain,
unix_get_mtu,
unix_receive_data,
unix_deliver_data,
unix_error,
unix_error_reply,
};
module_dependency module_dependencies[] = {
{NET_STACK_MODULE_NAME, (module_info **)&gStackModule},
{NET_BUFFER_MODULE_NAME, (module_info **)&gBufferModule},
// {NET_DATALINK_MODULE_NAME, (module_info **)&sDatalinkModule},
{NET_SOCKET_MODULE_NAME, (module_info **)&gSocketModule},
{}
};
module_info *modules[] = {
(module_info *)&gUnixModule,
NULL
};

View File

@ -0,0 +1,24 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#ifndef UNIX_H
#define UNIX_H
#define UNIX_MAX_TRANSFER_UNIT 65536
struct net_address_module_info;
struct net_buffer_module_info;
struct net_socket_module_info;
struct net_stack_module_info;
class UnixAddressManager;
extern net_address_module_info gAddressModule;
extern net_buffer_module_info *gBufferModule;
extern net_socket_module_info *gSocketModule;
extern net_stack_module_info *gStackModule;
extern UnixAddressManager gAddressManager;
#endif // UNIX_H