* Added per-port sanity limits. We limit the number and summed size of

messages.
* We now maintain a second message list per port, which is sorted by
  timeout time. Thus we can drop timed out messages as early as possible.
* Fixed a bug which caused messages to disappear in the port list.
* Now delete a port not only when an error occurred when delivering a
  message, but also when it is empty.
* More debug output.

Seems to be working well now.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@11142 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2005-01-30 00:54:59 +00:00
parent 0353d33137
commit c6dbc50e2b
2 changed files with 152 additions and 37 deletions

View File

@ -5,10 +5,12 @@
#include <map>
#include <new>
#include <set>
#include <AutoDeleter.h>
#include <Autolock.h>
#include <DataIO.h>
#include <DoublyLinkedList2.h>
#include <MessagePrivate.h>
#include <MessengerPrivate.h>
#include <OS.h>
@ -16,13 +18,18 @@
#include <messaging.h>
#include "Debug.h"
#include "MessageDeliverer.h"
#include "Referenceable.h"
// sDeliverer -- the singleton instance
MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
static const bigtime_t kRetryDelay = 20000; // 20 ms
static const bigtime_t kRetryDelay = 100000; // 100 ms
// per port sanity limits
static const int32 kMaxMessagesPerPort = 10000;
static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB
// Message
class MessageDeliverer::Message : public Referenceable {
@ -67,6 +74,11 @@ public:
return fTimeoutTime;
}
bool HasTimeout() const
{
return (fTimeoutTime < B_INFINITE_TIMEOUT);
}
void SetBusy(bool busy)
{
fBusy = busy;
@ -86,12 +98,12 @@ private:
};
// TargetMessage
class MessageDeliverer::TargetMessage {
class MessageDeliverer::TargetMessage
: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
public:
TargetMessage(Message *message, int32 token)
: fMessage(message),
fToken(token),
fNext(NULL)
fToken(token)
{
if (fMessage)
fMessage->AddReference();
@ -113,20 +125,58 @@ public:
return fToken;
}
void SetNext(TargetMessage *next)
private:
Message *fMessage;
int32 fToken;
};
// TargetMessageHandle
class MessageDeliverer::TargetMessageHandle {
public:
TargetMessageHandle(TargetMessage *message)
: fMessage(message)
{
fNext = next;
}
TargetMessage *Next() const
TargetMessageHandle(const TargetMessageHandle &other)
: fMessage(other.fMessage)
{
return fNext;
}
TargetMessage *GetMessage() const
{
return fMessage;
}
TargetMessageHandle &operator=(const TargetMessageHandle &other)
{
fMessage = other.fMessage;
return *this;
}
bool operator==(const TargetMessageHandle &other) const
{
return (fMessage == other.fMessage);
}
bool operator!=(const TargetMessageHandle &other) const
{
return (fMessage != other.fMessage);
}
bool operator<(const TargetMessageHandle &other) const
{
bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
if (timeout < otherTimeout)
return true;
if (timeout > otherTimeout)
return false;
return (fMessage < other.fMessage);
}
private:
Message *fMessage;
int32 fToken;
TargetMessage *fNext;
TargetMessage *fMessage;
};
// TargetPort
@ -134,14 +184,15 @@ class MessageDeliverer::TargetPort {
public:
TargetPort(port_id portID)
: fPortID(portID),
fFirstMessage(NULL),
fLastMessage(NULL)
fMessages(),
fMessageCount(0),
fMessageSize(0)
{
}
~TargetPort()
{
while (fFirstMessage)
while (!fMessages.IsEmpty())
PopMessage();
}
@ -152,6 +203,8 @@ public:
status_t PushMessage(Message *message, int32 token)
{
PRINT(("MessageDeliverer::TargetPort::PushMessage(port: %ld, %p, %ld)\n",
fPortID, message, token));
// create a target message
TargetMessage *targetMessage
= new(nothrow) TargetMessage(message, token);
@ -159,49 +212,103 @@ public:
return B_NO_MEMORY;
// push it
if (fLastMessage)
fLastMessage->SetNext(targetMessage);
else
fFirstMessage = fLastMessage = targetMessage;
fMessages.Insert(targetMessage);
fMessageCount++;
fMessageSize += targetMessage->GetMessage()->DataSize();
// add it to the timeoutable messages, if it has a timeout
if (message->HasTimeout())
fTimeoutableMessages.insert(targetMessage);
_EnforceLimits();
return B_OK;
}
Message *PeekMessage(int32 &token) const
{
if (!fFirstMessage)
if (!fMessages.Head())
return NULL;
token = fFirstMessage->Token();
return fFirstMessage->GetMessage();
token = fMessages.Head()->Token();
return fMessages.Head()->GetMessage();
}
void PopMessage()
{
if (fFirstMessage) {
TargetMessage *message = fFirstMessage;
fFirstMessage = message->Next();
if (!fFirstMessage)
fLastMessage = NULL;
delete message;
if (fMessages.Head()) {
PRINT(("MessageDeliverer::TargetPort::PopMessage(): port: %ld, %p\n",
fPortID, fMessages.Head()->GetMessage()));
_RemoveMessage(fMessages.Head());
}
}
void DropTimedOutMessages()
{
bigtime_t now = system_time();
while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
if (message->GetMessage()->TimeoutTime() > now)
break;
PRINT(("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %ld: "
"message %p timed out\n", fPortID, message->GetMessage()));
_RemoveMessage(message);
}
}
bool IsEmpty() const
{
return !fFirstMessage;
return fMessages.IsEmpty();
}
private:
port_id fPortID;
TargetMessage *fFirstMessage;
TargetMessage *fLastMessage;
void _RemoveMessage(TargetMessage *message)
{
fMessages.Remove(message);
fMessageCount--;
fMessageSize -= message->GetMessage()->DataSize();
if (message->GetMessage()->HasTimeout())
fTimeoutableMessages.erase(message);
delete message;
}
void _EnforceLimits()
{
// message count
while (fMessageCount > kMaxMessagesPerPort) {
PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
"message count limit.\n", fPortID));
PopMessage();
}
// message size
while (fMessageSize > kMaxDataPerPort) {
PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
"message size limit.\n", fPortID));
PopMessage();
}
}
typedef DoublyLinkedList<TargetMessage> MessageList;
port_id fPortID;
MessageList fMessages;
int32 fMessageCount;
int32 fMessageSize;
set<TargetMessageHandle> fTimeoutableMessages;
};
// TargetPortMap
struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
};
// #pragma mark -
// constructor
MessageDeliverer::MessageDeliverer()
: fLock("message deliverer"),
@ -481,8 +588,11 @@ MessageDeliverer::_PutTargetPort(TargetPort *port)
status_t
MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
{
return BMessage::Private::SendFlattenedMessage(message->Data(),
status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
message->DataSize(), portID, token, (token < 0), 0);
//PRINT(("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
//message, portID, token, error));
return error;
}
// _DelivererThreadEntry
@ -508,15 +618,19 @@ MessageDeliverer::_DelivererThread()
TargetPort *port = it->second;
bool portError = false;
port->DropTimedOutMessages();
// try sending all messages
int32 token;
while (Message *message = port->PeekMessage(token)) {
status_t error = B_OK;
if (message->TimeoutTime() > system_time()) {
// if (message->TimeoutTime() > system_time()) {
error = _SendMessage(message, port->PortID(), token);
} else {
// timeout, drop message
}
// } else {
// // timeout, drop message
// PRINT(("MessageDeliverer::_DelivererThread(): port %ld, "
// "message %p timed out\n", port->PortID(), message));
// }
if (error == B_OK) {
port->PopMessage();
@ -530,7 +644,7 @@ MessageDeliverer::_DelivererThread()
}
// next port
if (portError) {
if (portError || port->IsEmpty()) {
TargetPortMap::iterator oldIt = it;
++it;
delete port;

View File

@ -42,6 +42,7 @@ public:
private:
class Message;
class TargetMessage;
class TargetMessageHandle;
class TargetPort;
struct TargetPortMap;