* Added a timeout to the delivery functions. This is the time after which

the message will be finally dropped. Makes sense for periodic message
  runners for instance.
* Set the target of a BMessage before flattening it. Thus there will be
  space in the flattened header for it.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@11071 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2005-01-26 15:18:29 +00:00
parent 7f1ee731a9
commit d306a4652c
2 changed files with 49 additions and 18 deletions

View File

@ -26,13 +26,19 @@ static const bigtime_t kRetryDelay = 20000; // 20 ms
// Message
class MessageDeliverer::Message : public Referenceable {
public:
Message(void *data, int32 dataSize)
Message(void *data, int32 dataSize, bigtime_t timeout)
: Referenceable(true),
fData(data),
fDataSize(dataSize),
fCreationTime(system_time()),
fBusy(false)
{
if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
fTimeoutTime = B_INFINITE_TIMEOUT;
else if (timeout <= 0)
fTimeoutTime = fCreationTime;
else
fTimeoutTime = fCreationTime + timeout;
}
~Message()
@ -50,11 +56,16 @@ public:
return fDataSize;
}
bigtime_t CreationTime()
bigtime_t CreationTime() const
{
return fCreationTime;
}
bigtime_t TimeoutTime() const
{
return fTimeoutTime;
}
void SetBusy(bool busy)
{
fBusy = busy;
@ -69,6 +80,7 @@ private:
void *fData;
int32 fDataSize;
bigtime_t fCreationTime;
bigtime_t fTimeoutTime;
bool fBusy;
};
@ -274,21 +286,30 @@ MessageDeliverer::Default()
// DeliverMessage
status_t
MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target)
MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
bigtime_t timeout)
{
BMessenger::Private messengerPrivate(target);
return DeliverMessage(message, messengerPrivate.Port(),
messengerPrivate.IsPreferredTarget()
? B_PREFERRED_TOKEN : messengerPrivate.Token());
? B_PREFERRED_TOKEN : messengerPrivate.Token(),
timeout);
}
// DeliverMessage
status_t
MessageDeliverer::DeliverMessage(BMessage *message, port_id port, int32 token)
MessageDeliverer::DeliverMessage(BMessage *message, port_id port, int32 token,
bigtime_t timeout)
{
if (!message)
return B_BAD_VALUE;
// Set the token now, so that the header contains room for it.
// It will be set when sending the message anyway, but if it is not set
// before flattening, the header will not contain room for it, and it
// will not possible to send the message flattened later.
BMessage::Private(message).SetTarget(token, (token < 0));
// flatten the message
BMallocIO mallocIO;
status_t error = message->Flatten(&mallocIO);
@ -296,35 +317,36 @@ MessageDeliverer::DeliverMessage(BMessage *message, port_id port, int32 token)
return error;
return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), port,
token);
token, timeout);
}
// DeliverMessage
status_t
MessageDeliverer::DeliverMessage(const void *message, int32 messageSize,
BMessenger target)
BMessenger target, bigtime_t timeout)
{
BMessenger::Private messengerPrivate(target);
return DeliverMessage(message, messageSize, messengerPrivate.Port(),
messengerPrivate.IsPreferredTarget()
? B_PREFERRED_TOKEN : messengerPrivate.Token());
? B_PREFERRED_TOKEN : messengerPrivate.Token(),
timeout);
}
// DeliverMessage
status_t
MessageDeliverer::DeliverMessage(const void *message, int32 messageSize,
port_id port, int32 token)
port_id port, int32 token, bigtime_t timeout)
{
messaging_target target;
target.port = port;
target.token = token;
return DeliverMessage(message, messageSize, &target, 1);
return DeliverMessage(message, messageSize, &target, 1, timeout);
}
// DeliverMessage
status_t
MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
const messaging_target *targets, int32 targetCount)
const messaging_target *targets, int32 targetCount, bigtime_t timeout)
{
if (!messageData || messageSize <= 0)
return B_BAD_VALUE;
@ -336,7 +358,7 @@ MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
memcpy(data, messageData, messageSize);
// create a Message
Message *message = new(nothrow) Message(data, messageSize);
Message *message = new(nothrow) Message(data, messageSize, timeout);
if (!message) {
free(data);
return B_NO_MEMORY;
@ -448,7 +470,13 @@ MessageDeliverer::_DelivererThread()
// try sending all messages
int32 token;
while (Message *message = port->PeekMessage(token)) {
status_t error = _SendMessage(message, port->PortID(), token);
status_t error = B_OK;
if (message->TimeoutTime() > system_time()) {
error = _SendMessage(message, port->PortID(), token);
} else {
// timeout, drop message
}
if (error == B_OK) {
port->PopMessage();
} else if (error == B_WOULD_BLOCK) {

View File

@ -23,14 +23,17 @@ public:
static void DeleteDefault();
static MessageDeliverer *Default();
status_t DeliverMessage(BMessage *message, BMessenger target);
status_t DeliverMessage(BMessage *message, port_id port, int32 token);
status_t DeliverMessage(BMessage *message, BMessenger target,
bigtime_t timeout = B_INFINITE_TIMEOUT);
status_t DeliverMessage(BMessage *message, port_id port, int32 token,
bigtime_t timeout = B_INFINITE_TIMEOUT);
status_t DeliverMessage(const void *message, int32 messageSize,
BMessenger target);
BMessenger target, bigtime_t timeout = B_INFINITE_TIMEOUT);
status_t DeliverMessage(const void *message, int32 messageSize,
port_id port, int32 token);
port_id port, int32 token, bigtime_t timeout = B_INFINITE_TIMEOUT);
status_t DeliverMessage(const void *message, int32 messageSize,
const messaging_target *targets, int32 targetCount);
const messaging_target *targets, int32 targetCount,
bigtime_t timeout = B_INFINITE_TIMEOUT);
private:
class Message;