From 2c17b743c7ed82963dbc33f39db933081be22143 Mon Sep 17 00:00:00 2001 From: Ingo Weinhold Date: Tue, 25 Jan 2005 16:05:26 +0000 Subject: [PATCH] Implemented all missing functionality save the actual sending of a flattened message. Support for this needs to be added to BMessage first. git-svn-id: file:///srv/svn/repos/haiku/trunk/current@11034 a95241bf-73f2-0310-859d-f6bbb57e9c96 --- src/servers/registrar/MessageDeliverer.cpp | 86 ++++++++++++++++++++-- src/servers/registrar/MessageDeliverer.h | 2 + 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/src/servers/registrar/MessageDeliverer.cpp b/src/servers/registrar/MessageDeliverer.cpp index 671237f41d..6300743321 100644 --- a/src/servers/registrar/MessageDeliverer.cpp +++ b/src/servers/registrar/MessageDeliverer.cpp @@ -20,6 +20,8 @@ // sDeliverer -- the singleton instance MessageDeliverer *MessageDeliverer::sDeliverer = NULL; +static const bigtime_t kRetryDelay = 20000; // 20 ms + // Message class MessageDeliverer::Message : public Referencable { public: @@ -123,6 +125,12 @@ public: { } + ~TargetPort() + { + while (fFirstMessage) + PopMessage(); + } + port_id PortID() const { return fPortID; @@ -145,7 +153,7 @@ public: return B_OK; } - Message *PeekMessage(int32 &token) + Message *PeekMessage(int32 &token) const { if (!fFirstMessage) return NULL; @@ -165,6 +173,11 @@ public: } } + bool IsEmpty() const + { + return !fFirstMessage; + } + private: port_id fPortID; TargetMessage *fFirstMessage; @@ -189,8 +202,6 @@ MessageDeliverer::~MessageDeliverer() { fTerminating = true; -// TODO: How to stop the thread? - if (fDelivererThread >= 0) { int32 result; wait_for_thread(fDelivererThread, &result); @@ -339,6 +350,25 @@ MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize, if (!port) return B_NO_MEMORY; + // try sending the message, if there are no queued messages yet + if (port->IsEmpty()) { + status_t error = _SendMessage(message, targets[i].port, + targets[i].token); + // if the message was delivered OK, we're done with the target + if (error == B_OK) { + _PutTargetPort(port); + continue; + } + + // if the port is not full, but an error occurred, we skip this target + if (error != B_WOULD_BLOCK) { + _PutTargetPort(port); + if (targetCount == 1) + return error; + continue; + } + } + // add the message status_t error = port->PushMessage(message, targets[i].token); _PutTargetPort(port); @@ -377,13 +407,20 @@ MessageDeliverer::_PutTargetPort(TargetPort *port) if (!port) return; - int32 token; - if (!port->PeekMessage(token)) { + if (port->IsEmpty()) { fTargetPorts->erase(port->PortID()); delete port; } } +// _SendMessage +status_t +MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token) +{ +// TODO: Implement! + return B_ERROR; +} + // _DelivererThreadEntry int32 MessageDeliverer::_DelivererThreadEntry(void *data) @@ -395,8 +432,43 @@ MessageDeliverer::_DelivererThreadEntry(void *data) int32 MessageDeliverer::_DelivererThread() { -// while (fTerminating) { -// } + while (!fTerminating) { + snooze(kRetryDelay); + if (fTerminating) + break; + + // iterate through all target ports and try sending the messages + BAutolock _(fLock); + for (TargetPortMap::iterator it = fTargetPorts->begin(); + it != fTargetPorts->end();) { + TargetPort *port = it->second; + bool portError = false; + + // try sending all messages + int32 token; + while (Message *message = port->PeekMessage(token)) { + status_t error = _SendMessage(message, port->PortID(), token); + if (error == B_OK) { + port->PopMessage(); + } else if (error == B_WOULD_BLOCK) { + // no luck yet -- port is still full + break; + } else { + // unexpected error -- probably the port is gone + portError = true; + } + } + + // next port + if (portError) { + TargetPortMap::iterator oldIt = it; + ++it; + delete port; + fTargetPorts->erase(oldIt); + } else + ++it; + } + } return 0; } diff --git a/src/servers/registrar/MessageDeliverer.h b/src/servers/registrar/MessageDeliverer.h index 1b7e52a89b..f1d192f4ff 100644 --- a/src/servers/registrar/MessageDeliverer.h +++ b/src/servers/registrar/MessageDeliverer.h @@ -41,6 +41,8 @@ private: TargetPort *_GetTargetPort(port_id portID, bool create = false); void _PutTargetPort(TargetPort *port); + status_t _SendMessage(Message *message, port_id portID, int32 token); + static int32 _DelivererThreadEntry(void *data); int32 _DelivererThread();