From b35311f1ba07855a6152bb248b1758567f52e224 Mon Sep 17 00:00:00 2001 From: Pawel Dziepak Date: Fri, 25 May 2012 22:43:23 +0200 Subject: [PATCH] nfs4: Add RPC main communication code RPC::Server represents a server client is connected to. RPC::RequestManager stores all call that have been sent but no answer was received so far. RPC::ServerManager is responsible for reusing connections to the servers and closing them when no longer needed. --- src/add-ons/kernel/file_systems/nfs4/Jamfile | 1 + .../kernel/file_systems/nfs4/RPCServer.cpp | 349 ++++++++++++++++++ .../kernel/file_systems/nfs4/RPCServer.h | 104 ++++++ 3 files changed, 454 insertions(+) create mode 100644 src/add-ons/kernel/file_systems/nfs4/RPCServer.cpp create mode 100644 src/add-ons/kernel/file_systems/nfs4/RPCServer.h diff --git a/src/add-ons/kernel/file_systems/nfs4/Jamfile b/src/add-ons/kernel/file_systems/nfs4/Jamfile index 360f9ec6d3..db1386b800 100644 --- a/src/add-ons/kernel/file_systems/nfs4/Jamfile +++ b/src/add-ons/kernel/file_systems/nfs4/Jamfile @@ -8,5 +8,6 @@ KernelAddon nfs4 : RPCAuth.cpp RPCCall.cpp RPCReply.cpp + RPCServer.cpp XDR.cpp ; diff --git a/src/add-ons/kernel/file_systems/nfs4/RPCServer.cpp b/src/add-ons/kernel/file_systems/nfs4/RPCServer.cpp new file mode 100644 index 0000000000..031f0051cc --- /dev/null +++ b/src/add-ons/kernel/file_systems/nfs4/RPCServer.cpp @@ -0,0 +1,349 @@ +/* + * Copyright 2012 Haiku, Inc. All rights reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Paweł Dziepak, pdziepak@quarnos.org + */ + + +#include "RPCServer.h" + +#include + +#include "RPCReply.h" + + +using namespace RPC; + + +RequestManager::RequestManager() + : + fQueueHead(NULL), + fQueueTail(NULL) +{ + mutex_init(&fLock, NULL); +} + + +RequestManager::~RequestManager() +{ + mutex_destroy(&fLock); +} + + +void +RequestManager::AddRequest(Request* req) +{ + mutex_lock(&fLock); + if (fQueueTail != NULL) + fQueueTail->fNext = req; + else + fQueueHead = req; + fQueueTail = req; + mutex_unlock(&fLock); +} + + +Request* +RequestManager::FindRequest(uint32 xid) +{ + mutex_lock(&fLock); + Request* req = fQueueHead; + Request* prev = NULL; + while (req != NULL) { + if (req->fXID == xid) { + if (prev != NULL) + prev->fNext = req->fNext; + if (fQueueTail == req) + fQueueTail = prev; + if (fQueueHead == req) + fQueueHead = req->fNext; + mutex_unlock(&fLock); + dprintf("Found %x %x\n", (unsigned int)fQueueHead, (unsigned int)fQueueTail); + + return req; + } + + prev = req; + req = req->fNext; + } + dprintf("Nothing found\n"); + mutex_unlock(&fLock); + + return NULL; +} + + +Server::Server(Connection* conn) + : + fConnection(conn), + fXID(rand() << 1) +{ + _StartListening(); +} + + +Server::~Server() +{ + fThreadCancel = true; + + delete fConnection; + + status_t result; + wait_for_thread(fThread, &result); +} + + +status_t +Server::_StartListening() +{ + dprintf("new thread\n"); + fThreadCancel = false; + fThread = spawn_kernel_thread(&Server::_ListenerThreadStart, + "NFSv4 Listener", B_NORMAL_PRIORITY, this); + if (fThread < B_OK) + return fThread; + + status_t result = resume_thread(fThread); + if (result != B_OK) { + kill_thread(fThread); + return result; + } + + return B_OK; +} + + +status_t +Server::SendCall(Call* call, Reply** reply) +{ + status_t result; + + Request* req = new(std::nothrow) Request; + if (req == NULL) + return B_NO_MEMORY; + + uint32 xid = _GetXID(); + call->SetXID(xid); + req->fXID = xid; + req->fReply = reply; + req->fNext = NULL; + + fRequests.AddRequest(req); + + XDR::WriteStream& stream = call->Stream(); + result = fConnection->Send(stream.Buffer(), stream.Size()); + if (result != B_OK) + goto out_cancel; + + result = req->fEvent.Wait(B_RELATIVE_TIMEOUT, kWaitTime); + if (result != B_OK) + goto out_cancel; + + delete req; + return B_OK; + +out_cancel: + CancelCall(xid); + delete req; + return result; +} + + +void +Server::CancelCall(uint32 xid) +{ + fRequests.FindRequest(xid); +} + + +uint32 +Server::_GetXID() +{ + return (uint32)atomic_add(&fXID, 1); +} + + +status_t +Server::_Listener() +{ + status_t result; + uint32 size; + void* buffer; + + while (!fThreadCancel) { + result = fConnection->Receive(&buffer, &size); + if (result != B_OK) + return result; + + Reply* reply = new(std::nothrow) Reply(buffer, size); + if (reply == NULL) { + free(buffer); + return B_NO_MEMORY; + } + + Request* req = fRequests.FindRequest(reply->GetXID()); + if (req != NULL) { + *req->fReply = reply; + req->fEvent.NotifyAll(); + } + } + + return B_OK; +} + + +status_t +Server::_ListenerThreadStart(void* ptr) +{ + Server* server = reinterpret_cast(ptr); + return server->_Listener(); +} + + +ServerManager::ServerManager() + : + fRoot(NULL) +{ + mutex_init(&fLock, NULL); +} + + +ServerManager::~ServerManager() +{ +} + + +status_t +ServerManager::Acquire(Server** pserv, uint32 ip, uint16 port, Transport proto) +{ + status_t result; + + ServerAddress id; + id.fAddress = ip; + id.fPort = port; + id.fProtocol = proto; + + mutex_lock(&fLock); + ServerNode* node = _Find(id); + if (node != NULL) { + node->fRefCount++; + mutex_unlock(&fLock); + + *pserv = node->fServer; + return B_OK; + } + mutex_unlock(&fLock); + dprintf("creating\n"); + + node = new(std::nothrow) ServerNode; + if (node == NULL) + return B_NO_MEMORY; + + node->fID = id; + + Connection* conn; + result = Connection::Connect(&conn, id); + if (result != B_OK) { + delete node; + return result; + } + + node->fServer = new Server(conn); + if (node->fServer == NULL) { + delete node; + delete conn; + return B_NO_MEMORY; + } + + node->fRefCount = 1; + node->fLeft = node->fRight = NULL; + + // We need to be prepared if someone already connected to the server and + // updated the BST. In such case we use that connection and cancel ours. + mutex_lock(&fLock); + ServerNode* nd = _Insert(node); + if (nd != node) { + nd->fRefCount++; + mutex_unlock(&fLock); + + delete node->fServer; + delete node; + *pserv = nd->fServer; + return B_OK; + } + mutex_unlock(&fLock); + + *pserv = node->fServer; + return B_OK; +} + + +void +ServerManager::Release(Server* serv) +{ +#if 0 + mutex_lock(&fLock); + ServerNode* node = _Find(serv->GetID()); + if (node != NULL) { + node->fRefCount--; + + if (node->fRefCount == 0) + _Delete(node); + } + mutex_unlock(&fLock); + + if (node != NULL) { + delete node->fServer; + delete node; + } +#endif +} + + +ServerNode* +ServerManager::_Find(ServerAddress& id) +{ + ServerNode* node = fRoot; + while (node != NULL) { + dprintf("passing addr: %x port %d proto: %d\n", (int)node->fID.fAddress, (int)node->fID.fPort, (int)node->fID.fProtocol); + if (node->fID == id) + return node; + if (node->fID < id) + node = node->fRight; + else + node = node->fLeft; + } + + return node; +} + + +ServerNode* +ServerManager::_Insert(ServerNode* node) +{ + ServerNode* previous = NULL; + ServerNode* current = fRoot; + while (current != NULL) { + if (current->fID == node->fID) + return current; + if (current->fID < node->fID) { + previous = current; + current = current->fRight; + } else { + previous = current; + current = current->fLeft; + } + } + + if (previous == NULL) + fRoot = node; + else if (previous->fID < node->fID) + previous->fRight = node; + else + previous->fLeft = node; + + return node; +} + diff --git a/src/add-ons/kernel/file_systems/nfs4/RPCServer.h b/src/add-ons/kernel/file_systems/nfs4/RPCServer.h new file mode 100644 index 0000000000..3c03cf02db --- /dev/null +++ b/src/add-ons/kernel/file_systems/nfs4/RPCServer.h @@ -0,0 +1,104 @@ +/* + * Copyright 2012 Haiku, Inc. All rights reserved. + * Distributed under the terms of the MIT License. + * + * Authors: + * Paweł Dziepak, pdziepak@quarnos.org + */ +#ifndef RPCSERVER_H +#define RPCSERVER_H + + +#include +#include + +#include "Connection.h" +#include "RPCCall.h" +#include "RPCReply.h" + + +namespace RPC { + +struct Request { + uint32 fXID; + ConditionVariable fEvent; + Reply** fReply; + + Request* fNext; +}; + +class RequestManager { +public: + RequestManager(); + ~RequestManager(); + + void AddRequest(Request* req); + Request* FindRequest(uint32 xid); + +private: + mutex fLock; + Request* fQueueHead; + Request* fQueueTail; + +}; + +class Server { +public: + Server(Connection* conn); + virtual ~Server(); + + status_t SendCall(Call* call, Reply** reply); + inline void CancelCall(uint32 xid); + +protected: + inline uint32 _GetXID(); + status_t _StartListening(); + +private: + status_t _Listener(); + static status_t _ListenerThreadStart(void* ptr); + + thread_id fThread; + bool fThreadCancel; + + RequestManager fRequests; + Connection* fConnection; + + vint32 fXID; + static const bigtime_t kWaitTime = 1000000; +}; + +struct ServerNode { + ServerAddress fID; + Server* fServer; + int fRefCount; + + ServerNode* fLeft; + ServerNode* fRight; +}; + +class ServerManager { +public: + ServerManager(); + ~ServerManager(); + + status_t Acquire(Server** pserv, uint32 ip, uint16 port, + Transport proto); + void Release(Server* serv); + +private: + + + ServerNode* _Find(ServerAddress& id); + void _Delete(ServerNode* node); + ServerNode* _Insert(ServerNode* node); + + ServerNode* fRoot; + mutex fLock; +}; + +} // namespace RPC + + +#endif // RPCSERVER_H +