BSocketMessenger: Further improvements.

- Messages that expect a reply are now tagged with a unique ID field to
  indicate that expectation to the receiving socket messenger.
- The messenger now maintains a map of received reply IDs and their
  corresponding messages, along with a message queue of other unsolicited
  replies.
- After successfully connecting, the messenger now spawns a thread
  whose sole responsibility is receiving and parsing all incoming messages,
  and consequently sorting them into the aforementioned data structures based
  on the presence of the reply ID. Callers who are awaiting either replies or
  other messages are signalled appropriately via a semaphore. This allows
  multiplexing of both types of messages on the same socket.
This commit is contained in:
Rene Gollent 2016-05-14 21:22:26 -04:00
parent 9c7d0c3157
commit b8a716965a
2 changed files with 286 additions and 9 deletions

View File

@ -38,17 +38,30 @@ public:
virtual status_t SendMessage(const BMessage& message,
BMessenger& replyTarget,
bigtime_t timeout = B_INFINITE_TIMEOUT);
virtual status_t SendReply(const BMessage& message,
const BMessage& reply);
// wait for unsolicited message on socket
virtual status_t ReceiveMessage(BMessage& _message,
bigtime_t timeout = B_INFINITE_TIMEOUT);
private:
struct Private;
private:
BSocketMessenger(const BSocketMessenger&);
BSocketMessenger& operator=(const BSocketMessenger&);
void _Init();
status_t _WaitForMessage(bigtime_t timeout);
status_t _SendMessage(const BMessage& message);
status_t _ReadMessage(BMessage& _message,
bigtime_t timeout);
status_t _ReadMessage(BMessage& _message);
status_t _ReadReply(int64 replyID,
BMessage& _reply, bigtime_t timeout);
static status_t _MessageReader(void* arg);
private:
Private* fPrivateData;
BSocket fSocket;
status_t fInitStatus;
};

View File

@ -7,44 +7,144 @@
#include <SocketMessenger.h>
#include <Message.h>
#include <MessageQueue.h>
#include <Messenger.h>
#include <AutoDeleter.h>
#include <AutoLocker.h>
#include <HashMap.h>
static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
// #pragma mark - BSocketMessenger::Private
struct BSocketMessenger::Private {
typedef SynchronizedHashMap<HashKey64<int64>,
BMessage> ReplyMessageMap;
Private();
virtual ~Private();
void ClearMessages();
sem_id fMessageWaiters;
thread_id fReplyReader;
ReplyMessageMap fReceivedReplies;
BMessageQueue fReceivedMessages;
int64 fReplyIDCounter;
};
BSocketMessenger::Private::Private()
:
fMessageWaiters(-1),
fReplyReader(-1),
fReceivedReplies(),
fReceivedMessages(),
fReplyIDCounter(0)
{
}
BSocketMessenger::Private::~Private()
{
if (fMessageWaiters > 0)
delete_sem(fMessageWaiters);
if (fReplyReader > 0)
wait_for_thread(fReplyReader, NULL);
ClearMessages();
}
void
BSocketMessenger::Private::ClearMessages()
{
fReceivedReplies.Clear();
AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
while (!fReceivedMessages.IsEmpty())
delete fReceivedMessages.NextMessage();
}
// #pragma mark - BSocketMessenger
BSocketMessenger::BSocketMessenger()
:
fPrivateData(NULL),
fSocket(),
fInitStatus(B_NO_INIT)
{
_Init();
}
BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
bigtime_t timeout)
:
fPrivateData(NULL),
fSocket(),
fInitStatus(B_NO_INIT)
{
_Init();
SetTo(address, timeout);
}
BSocketMessenger::BSocketMessenger(const BSocket& socket)
:
fSocket(socket)
fPrivateData(NULL),
fSocket(socket),
fInitStatus(B_NO_INIT)
{
_Init();
if (fPrivateData == NULL)
return;
fInitStatus = socket.InitCheck();
if (fInitStatus != B_OK)
return;
fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
"Message Reader", B_NORMAL_PRIORITY, this);
if (fPrivateData->fReplyReader < 0)
fInitStatus = fPrivateData->fReplyReader;
if (fInitStatus != B_OK) {
exit_thread(fPrivateData->fReplyReader);
fPrivateData->fReplyReader = -1;
return;
}
fInitStatus = resume_thread(fPrivateData->fReplyReader);
}
BSocketMessenger::~BSocketMessenger()
{
Unset();
delete fPrivateData;
}
void
BSocketMessenger::Unset()
{
if (fPrivateData == NULL)
return;
fSocket.Disconnect();
wait_for_thread(fPrivateData->fReplyReader, NULL);
fPrivateData->fReplyReader = -1;
fPrivateData->ClearMessages();
release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
fInitStatus = B_NO_INIT;
}
@ -52,7 +152,22 @@ BSocketMessenger::Unset()
status_t
BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
{
return fInitStatus = fSocket.Connect(address, timeout);
Unset();
if (fPrivateData == NULL)
return B_NO_MEMORY;
fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
"Message Reader", B_NORMAL_PRIORITY, this);
if (fPrivateData->fReplyReader < 0)
return fPrivateData->fReplyReader;
status_t error = fSocket.Connect(address, timeout);
if (error != B_OK) {
Unset();
return error;
}
return fInitStatus = resume_thread(fPrivateData->fReplyReader);
}
@ -74,11 +189,14 @@ status_t
BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
bigtime_t timeout)
{
status_t error = _SendMessage(message);
int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
BMessage temp(message);
temp.AddInt64(kReplySenderIDField, replyID);
status_t error = _SendMessage(temp);
if (error != B_OK)
return error;
return _ReadMessage(_reply, timeout);
return _ReadReply(replyID, _reply, timeout);
}
@ -95,10 +213,90 @@ BSocketMessenger::SendMessage(const BMessage& message,
}
status_t
BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
{
int64 replyID;
if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
return B_NOT_ALLOWED;
BMessage replyMessage(reply);
replyMessage.AddInt64(kReplyReceiverIDField, replyID);
return SendMessage(replyMessage);
}
status_t
BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
{
return _ReadMessage(_message, timeout);
status_t error = B_OK;
AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
for (;;) {
if (!fPrivateData->fReceivedMessages.IsEmpty()) {
BMessage* nextMessage
= fPrivateData->fReceivedMessages.NextMessage();
_message = *nextMessage;
delete nextMessage;
break;
}
queueLocker.Unlock();
error = _WaitForMessage(timeout);
if (error != B_OK)
break;
if (!fSocket.IsConnected()) {
error = B_CANCELED;
break;
}
queueLocker.Lock();
}
return error;
}
void
BSocketMessenger::_Init()
{
if (fPrivateData != NULL)
return;
BSocketMessenger::Private* data
= new(std::nothrow) BSocketMessenger::Private;
if (data == NULL) {
fInitStatus = B_NO_MEMORY;
return;
}
data->fMessageWaiters = create_sem(0, "message waiters");
if (data->fMessageWaiters < 0) {
fInitStatus = data->fMessageWaiters;
delete data;
return;
}
fPrivateData = data;
}
status_t
BSocketMessenger::_WaitForMessage(bigtime_t timeout)
{
for (;;) {
status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
B_RELATIVE_TIMEOUT, timeout);
if (error == B_INTERRUPTED) {
if (timeout != B_INFINITE_TIMEOUT)
timeout -= system_time();
continue;
}
if (error != B_OK)
return error;
break;
}
return B_OK;
}
@ -128,9 +326,9 @@ BSocketMessenger::_SendMessage(const BMessage& message)
status_t
BSocketMessenger::_ReadMessage(BMessage& _message, bigtime_t timeout)
BSocketMessenger::_ReadMessage(BMessage& _message)
{
status_t error = fSocket.WaitForReadable(timeout);
status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
if (error != B_OK)
return error;
@ -159,3 +357,69 @@ BSocketMessenger::_ReadMessage(BMessage& _message, bigtime_t timeout)
return _message.Unflatten(buffer);
}
status_t
BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
bigtime_t timeout)
{
status_t error = B_OK;
for (;;) {
if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
reply = fPrivateData->fReceivedReplies.Remove(replyID);
break;
}
error = _WaitForMessage(timeout);
if (error != B_OK)
break;
if (!fSocket.IsConnected()) {
error = B_CANCELED;
break;
}
}
return error;
}
status_t
BSocketMessenger::_MessageReader(void* arg)
{
BSocketMessenger* messenger = (BSocketMessenger*)arg;
BSocketMessenger::Private* data = messenger->fPrivateData;
status_t error = B_OK;
for (;;) {
BMessage message;
error = messenger->_ReadMessage(message);
if (error != B_OK)
break;
int64 replyID;
if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
error = data->fReceivedReplies.Put(replyID, message);
if (error != B_OK)
break;
} else {
BMessage* queueMessage = new(std::nothrow) BMessage(message);
if (queueMessage == NULL) {
error = B_NO_MEMORY;
break;
}
AutoLocker<BMessageQueue> queueLocker(
data->fReceivedMessages);
data->fReceivedMessages.AddMessage(queueMessage);
}
release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
}
// if we exit our message loop, ensure everybody wakes up and knows
// we're no longer receiving messages.
messenger->fSocket.Disconnect();
release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
return error;
}