Implemented all missing functionality save the actual sending of a

flattened message. Support for this needs to be added to BMessage first.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@11034 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2005-01-25 16:05:26 +00:00
parent 5ac1156848
commit 2c17b743c7
2 changed files with 81 additions and 7 deletions

View File

@ -20,6 +20,8 @@
// sDeliverer -- the singleton instance
MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
static const bigtime_t kRetryDelay = 20000; // 20 ms
// Message
class MessageDeliverer::Message : public Referencable {
public:
@ -123,6 +125,12 @@ public:
{
}
~TargetPort()
{
while (fFirstMessage)
PopMessage();
}
port_id PortID() const
{
return fPortID;
@ -145,7 +153,7 @@ public:
return B_OK;
}
Message *PeekMessage(int32 &token)
Message *PeekMessage(int32 &token) const
{
if (!fFirstMessage)
return NULL;
@ -165,6 +173,11 @@ public:
}
}
bool IsEmpty() const
{
return !fFirstMessage;
}
private:
port_id fPortID;
TargetMessage *fFirstMessage;
@ -189,8 +202,6 @@ MessageDeliverer::~MessageDeliverer()
{
fTerminating = true;
// TODO: How to stop the thread?
if (fDelivererThread >= 0) {
int32 result;
wait_for_thread(fDelivererThread, &result);
@ -339,6 +350,25 @@ MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
if (!port)
return B_NO_MEMORY;
// try sending the message, if there are no queued messages yet
if (port->IsEmpty()) {
status_t error = _SendMessage(message, targets[i].port,
targets[i].token);
// if the message was delivered OK, we're done with the target
if (error == B_OK) {
_PutTargetPort(port);
continue;
}
// if the port is not full, but an error occurred, we skip this target
if (error != B_WOULD_BLOCK) {
_PutTargetPort(port);
if (targetCount == 1)
return error;
continue;
}
}
// add the message
status_t error = port->PushMessage(message, targets[i].token);
_PutTargetPort(port);
@ -377,13 +407,20 @@ MessageDeliverer::_PutTargetPort(TargetPort *port)
if (!port)
return;
int32 token;
if (!port->PeekMessage(token)) {
if (port->IsEmpty()) {
fTargetPorts->erase(port->PortID());
delete port;
}
}
// _SendMessage
status_t
MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
{
// TODO: Implement!
return B_ERROR;
}
// _DelivererThreadEntry
int32
MessageDeliverer::_DelivererThreadEntry(void *data)
@ -395,8 +432,43 @@ MessageDeliverer::_DelivererThreadEntry(void *data)
int32
MessageDeliverer::_DelivererThread()
{
// while (fTerminating) {
// }
while (!fTerminating) {
snooze(kRetryDelay);
if (fTerminating)
break;
// iterate through all target ports and try sending the messages
BAutolock _(fLock);
for (TargetPortMap::iterator it = fTargetPorts->begin();
it != fTargetPorts->end();) {
TargetPort *port = it->second;
bool portError = false;
// try sending all messages
int32 token;
while (Message *message = port->PeekMessage(token)) {
status_t error = _SendMessage(message, port->PortID(), token);
if (error == B_OK) {
port->PopMessage();
} else if (error == B_WOULD_BLOCK) {
// no luck yet -- port is still full
break;
} else {
// unexpected error -- probably the port is gone
portError = true;
}
}
// next port
if (portError) {
TargetPortMap::iterator oldIt = it;
++it;
delete port;
fTargetPorts->erase(oldIt);
} else
++it;
}
}
return 0;
}

View File

@ -41,6 +41,8 @@ private:
TargetPort *_GetTargetPort(port_id portID, bool create = false);
void _PutTargetPort(TargetPort *port);
status_t _SendMessage(Message *message, port_id portID, int32 token);
static int32 _DelivererThreadEntry(void *data);
int32 _DelivererThread();