nfs4: Support callbacks over IPv6

This commit is contained in:
Pawel Dziepak 2013-02-13 01:14:45 +01:00
parent 07b3bd59ab
commit 266b99b7e9
8 changed files with 172 additions and 59 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 Haiku, Inc. All rights reserved.
* Copyright 2012-2013 Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -61,6 +61,26 @@ PeerAddress::PeerAddress()
}
PeerAddress::PeerAddress(int networkFamily)
:
fProtocol(0)
{
ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
memset(&fAddress, 0, sizeof(fAddress));
fAddress.ss_family = networkFamily;
switch (networkFamily) {
case AF_INET:
fAddress.ss_len = sizeof(sockaddr_in);
break;
case AF_INET6:
fAddress.ss_len = sizeof(sockaddr_in6);
break;
}
}
const char*
PeerAddress::ProtocolString() const
{
@ -94,13 +114,11 @@ PeerAddress::SetProtocol(const char* protocol)
char*
PeerAddress::UniversalAddress() const
{
const sockaddr* address = reinterpret_cast<const sockaddr*>(&fAddress);
char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16));
if (uAddr == NULL)
return NULL;
if (inet_ntop(address->sa_family, InAddr(), uAddr, AddressSize()) == NULL)
if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL)
return NULL;
char port[16];
@ -114,7 +132,7 @@ PeerAddress::UniversalAddress() const
socklen_t
PeerAddress::AddressSize() const
{
switch (reinterpret_cast<const sockaddr*>(&fAddress)->sa_family) {
switch (Family()) {
case AF_INET:
return sizeof(sockaddr_in);
case AF_INET6:
@ -130,7 +148,7 @@ PeerAddress::Port() const
{
uint16 port;
switch (reinterpret_cast<const sockaddr*>(&fAddress)->sa_family) {
switch (Family()) {
case AF_INET:
port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port;
break;
@ -150,7 +168,7 @@ PeerAddress::SetPort(uint16 port)
{
port = htons(port);
switch (reinterpret_cast<sockaddr*>(&fAddress)->sa_family) {
switch (Family()) {
case AF_INET:
reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port;
break;
@ -160,11 +178,10 @@ PeerAddress::SetPort(uint16 port)
}
}
const void*
PeerAddress::InAddr() const
{
switch (reinterpret_cast<const sockaddr*>(&fAddress)->sa_family) {
switch (Family()) {
case AF_INET:
return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr;
case AF_INET6:
@ -178,7 +195,7 @@ PeerAddress::InAddr() const
size_t
PeerAddress::InAddrSize() const
{
switch (reinterpret_cast<const sockaddr*>(&fAddress)->sa_family) {
switch (Family()) {
case AF_INET:
return sizeof(in_addr);
case AF_INET6:
@ -723,21 +740,23 @@ ConnectionBase::Disconnect()
status_t
ConnectionListener::Listen(ConnectionListener** listener, uint16 port)
ConnectionListener::Listen(ConnectionListener** listener, int networkFamily,
uint16 port)
{
ASSERT(listener != NULL);
ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0)
return errno;
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_len = sizeof(addr);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != B_OK) {
PeerAddress address(networkFamily);
address.SetPort(port);
address.fProtocol = IPPROTO_TCP;
status_t result = bind(sock, (sockaddr*)&address.fAddress,
address.AddressSize());
if (result != B_OK) {
close(sock);
return errno;
}
@ -747,17 +766,12 @@ ConnectionListener::Listen(ConnectionListener** listener, uint16 port)
return errno;
}
PeerAddress address;
address.fProtocol = IPPROTO_TCP;
memset(&address.fAddress, 0, sizeof(address.fAddress));
*listener = new(std::nothrow) ConnectionListener(address);
if (*listener == NULL) {
close(sock);
return B_NO_MEMORY;
}
status_t result;
if ((*listener)->fWaitCancel < B_OK) {
result = (*listener)->fWaitCancel;
close(sock);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 Haiku, Inc. All rights reserved.
* Copyright 2012-2013 Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -25,6 +25,9 @@ struct PeerAddress {
PeerAddress& operator=(const PeerAddress& address);
PeerAddress();
PeerAddress(int networkFamily);
inline int Family() const;
const char* ProtocolString() const;
void SetProtocol(const char* protocol);
@ -40,6 +43,14 @@ struct PeerAddress {
size_t InAddrSize() const;
};
inline int
PeerAddress::Family() const
{
return fAddress.ss_family;
}
struct addrinfo;
class AddressResolver {
@ -122,7 +133,8 @@ public:
class ConnectionListener : public ConnectionBase {
public:
static status_t Listen(ConnectionListener** listener, uint16 port = 0);
static status_t Listen(ConnectionListener** listener, int networkFamily,
uint16 port = 0);
status_t AcceptConnection(Connection** connection);

View File

@ -14,6 +14,7 @@
namespace RPC {
class CallbackServer;
class CallbackRequest;
class Server;
@ -24,10 +25,14 @@ public:
inline void SetID(int32 id);
inline int32 ID();
inline void SetCBServer(CallbackServer* server);
inline CallbackServer* CBServer();
status_t EnqueueRequest(CallbackRequest* request,
Connection* connection);
private:
CallbackServer* fCBServer;
Server* fServer;
int32 fID;
};
@ -47,6 +52,20 @@ Callback::ID()
}
inline void
Callback::SetCBServer(CallbackServer* server)
{
fCBServer = server;
}
inline CallbackServer*
Callback::CBServer()
{
return fCBServer;
}
} // namespace RPC

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 Haiku, Inc. All rights reserved.
* Copyright 2012-2013 Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -13,22 +13,25 @@
#include "RPCCallback.h"
#include "RPCCallbackReply.h"
#include "RPCCallbackRequest.h"
#include "RPCServer.h"
using namespace RPC;
CallbackServer* gRPCCallbackServer = NULL;
CallbackServer* gRPCCallbackServer6 = NULL;
CallbackServer::CallbackServer()
CallbackServer::CallbackServer(int networkFamily)
:
fConnectionList(NULL),
fListener(NULL),
fThreadRunning(false),
fCallbackArray(NULL),
fArraySize(0),
fFreeSlot(-1)
fFreeSlot(-1),
fNetworkFamily(networkFamily)
{
mutex_init(&fConnectionLock, NULL);
mutex_init(&fThreadLock, NULL);
@ -47,6 +50,47 @@ CallbackServer::~CallbackServer()
}
CallbackServer*
CallbackServer::Get(Server* server)
{
ASSERT(server != NULL);
int family = server->ID().Family();
ASSERT(family == AF_INET || family == AF_INET6);
int idx;
switch (family) {
case AF_INET:
idx = 0;
break;
case AF_INET6:
idx = 1;
break;
default:
return NULL;
}
MutexLocker _(fServerCreationLock);
if (fServers[idx] == NULL)
fServers[idx] = new CallbackServer(family);
return fServers[idx];
}
void
CallbackServer::ShutdownAll()
{
MutexLocker _(fServerCreationLock);
for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
delete fServers[i];
memset(&fServers, 0, sizeof(fServers));
}
mutex CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
CallbackServer* CallbackServer::fServers[2] = { NULL, NULL };
status_t
CallbackServer::RegisterCallback(Callback* callback)
{
@ -82,6 +126,7 @@ CallbackServer::RegisterCallback(Callback* callback)
fCallbackArray[id].fCallback = callback;
callback->SetID(id);
callback->SetCBServer(this);
return B_OK;
}
@ -91,6 +136,7 @@ status_t
CallbackServer::UnregisterCallback(Callback* callback)
{
ASSERT(callback != NULL);
ASSERT(callback->CBServer() == this);
int32 id = callback->ID();
@ -98,6 +144,7 @@ CallbackServer::UnregisterCallback(Callback* callback)
fCallbackArray[id].fNext = fFreeSlot;
fFreeSlot = id;
callback->SetCBServer(NULL);
return B_OK;
}
@ -109,7 +156,7 @@ CallbackServer::StartServer()
if (fThreadRunning)
return B_OK;
status_t result = ConnectionListener::Listen(&fListener);
status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
if (result != B_OK)
return result;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 Haiku, Inc. All rights reserved.
* Copyright 2012-2013 Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -17,6 +17,7 @@
namespace RPC {
class Callback;
class Server;
struct ConnectionEntry {
Connection* fConnection;
@ -33,13 +34,16 @@ union CallbackSlot {
class CallbackServer {
public:
CallbackServer();
CallbackServer(int networkFamily);
~CallbackServer();
static CallbackServer* Get(Server* server);
static void ShutdownAll();
status_t RegisterCallback(Callback* callback);
status_t UnregisterCallback(Callback* callback);
inline PeerAddress LocalID();
inline PeerAddress LocalID();
protected:
status_t StartServer();
@ -57,6 +61,9 @@ protected:
inline Callback* GetCallback(int32 id);
private:
static mutex fServerCreationLock;
static CallbackServer* fServers[2];
mutex fConnectionLock;
ConnectionEntry* fConnectionList;
ConnectionListener* fListener;
@ -69,6 +76,8 @@ private:
CallbackSlot* fCallbackArray;
uint32 fArraySize;
int32 fFreeSlot;
int fNetworkFamily;
};
@ -76,6 +85,8 @@ inline PeerAddress
CallbackServer::LocalID()
{
PeerAddress address;
ASSERT(fListener != NULL);
fListener->GetLocalAddress(&address);
return address;
}
@ -93,9 +104,5 @@ CallbackServer::GetCallback(int32 id)
} // namespace RPC
extern RPC::CallbackServer* gRPCCallbackServer;
#endif // RPCCALLBACKSERVER_H

View File

@ -97,7 +97,7 @@ Server::Server(Connection* connection, PeerAddress* address)
Server::~Server()
{
if (fCallback != NULL)
gRPCCallbackServer->UnregisterCallback(fCallback);
fCallback->CBServer()->UnregisterCallback(fCallback);
delete fCallback;
mutex_destroy(&fCallbackLock);
mutex_destroy(&fRepairLock);
@ -235,9 +235,22 @@ Callback*
Server::GetCallback()
{
MutexLocker _(fCallbackLock);
if (fCallback == NULL) {
fCallback = new(std::nothrow) Callback(this);
gRPCCallbackServer->RegisterCallback(fCallback);
if (fCallback == NULL)
return NULL;
CallbackServer* server = CallbackServer::Get(this);
if (server == NULL) {
delete fCallback;
return NULL;
}
if (server->RegisterCallback(fCallback) != B_OK) {
delete fCallback;
return NULL;
}
}
return fCallback;

View File

@ -671,21 +671,29 @@ RequestBuilder::SetClientID(RPC::Server* server)
fRequest->Stream().AddUInt(0x40000000);
uint32 id = server->GetCallback()->ID();
if (server->GetCallback() != NULL) {
ASSERT(server->GetCallback()->CBServer() != NULL);
PeerAddress local = gRPCCallbackServer->LocalID();
PeerAddress servAddr = server->LocalID();
servAddr.SetPort(local.Port());
uint32 id = server->GetCallback()->ID();
fRequest->Stream().AddString(local.ProtocolString());
PeerAddress local = server->GetCallback()->CBServer()->LocalID();
PeerAddress servAddr = server->LocalID();
servAddr.SetPort(local.Port());
char* uAddr = servAddr.UniversalAddress();
if (uAddr == NULL)
return B_NO_MEMORY;
fRequest->Stream().AddString(uAddr);
free(uAddr);
fRequest->Stream().AddString(local.ProtocolString());
fRequest->Stream().AddUInt(id);
char* uAddr = servAddr.UniversalAddress();
if (uAddr == NULL)
return B_NO_MEMORY;
fRequest->Stream().AddString(uAddr);
free(uAddr);
fRequest->Stream().AddUInt(id);
} else {
fRequest->Stream().AddString("");
fRequest->Stream().AddString("");
fRequest->Stream().AddUInt(0);
}
fOpCount++;

View File

@ -1367,14 +1367,6 @@ nfs4_init()
return B_NO_MEMORY;
}
gRPCCallbackServer = new(std::nothrow) RPC::CallbackServer;
if (gRPCCallbackServer == NULL) {
mutex_destroy(&gIdMapperLock);
delete gWorkQueue;
delete gRPCServerManager;
return B_NO_MEMORY;
}
return B_OK;
}
@ -1382,7 +1374,8 @@ nfs4_init()
status_t
nfs4_uninit()
{
delete gRPCCallbackServer;
RPC::CallbackServer::ShutdownAll();
delete gIdMapper;
delete gWorkQueue;
delete gRPCServerManager;