Implemented direct message passing for local targets; this fixes a deadlock

with PostMessage() in case the message queue is full.
Some notes:
* for synchronous replies, we don't use this mechanism yet, but it could be
  extended to do that as well.
* the code looks so complicated because we need a way to access the looper's
  queue without locking it (to prevent deadlocks); like Dano's solution, I've
  abused BTokenSpace to store a BDirectMessageTarget with a BHandler.
* we also need to decouple the lifetime of a looper's queue from its target,
  as we cannot lock the looper, and therefore, can't guarantee it stays valid
  as long as we're accessing it outside of BLooper.
* init_clipboard() now needs to be done after the global constructors have
  been called - since sending messages now needs gDefaultTokens to be initialized.
  Since this is done per image, it shouldn't cause any troubles, though.
* some minor cleanup, removed unused _msg_cache_cleanup_() and friends.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@19968 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2007-01-26 13:59:56 +00:00
parent caa76e0da1
commit 9dbe170a69
14 changed files with 264 additions and 65 deletions

View File

@ -18,6 +18,7 @@
class BMessage;
class BMessageQueue;
namespace BPrivate {
class BDirectMessageTarget;
class BLooperList;
}
@ -150,7 +151,7 @@ private:
BHandler* resolve_specifier(BHandler* target, BMessage* msg);
void UnlockFully();
BMessageQueue* fQueue;
BPrivate::BDirectMessageTarget* fDirectTarget;
BMessage* fLastMessage;
port_id fMsgPort;
int32 fAtomicCount;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2005, Haiku Inc. All Rights Reserved.
* Copyright 2005-2007, Haiku Inc. All Rights Reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -22,13 +22,8 @@ class BMessenger;
class BHandler;
class BString;
// Private or reserved ---------------------------------------------------------
extern "C" void _msg_cache_cleanup_();
extern "C" int _init_message_();
extern "C" int _delete_message_();
//------------------------------------------------------------------------------
// Name lengths and Scripting specifiers ---------------------------------------
// Name lengths and Scripting specifiers
#define B_FIELD_NAME_LENGTH 255
#define B_PROPERTY_NAME_LENGTH 255
@ -57,7 +52,7 @@ class BMessage {
BMessage &operator=(const BMessage &other);
// Statistics and misc info
// Statistics and misc info
status_t GetInfo(type_code typeRequested, int32 index,
char **nameFound, type_code *typeFound,
int32 *countFound = NULL) const;
@ -74,7 +69,7 @@ class BMessage {
status_t Rename(const char *oldEntry, const char *newEntry);
// Delivery info
// Delivery info
bool WasDelivered() const;
bool IsSourceWaiting() const;
bool IsSourceRemote() const;
@ -83,7 +78,7 @@ class BMessage {
bool WasDropped() const;
BPoint DropPoint(BPoint *offset = NULL) const;
// Replying
// Replying
status_t SendReply(uint32 command, BHandler *replyTo = NULL);
status_t SendReply(BMessage *reply, BHandler *replyTo = NULL,
bigtime_t timeout = B_INFINITE_TIMEOUT);
@ -95,14 +90,14 @@ class BMessage {
bigtime_t sendTimeout = B_INFINITE_TIMEOUT,
bigtime_t replyTimeout = B_INFINITE_TIMEOUT);
// Flattening data
// Flattening data
ssize_t FlattenedSize() const;
status_t Flatten(char *buffer, ssize_t size) const;
status_t Flatten(BDataIO *stream, ssize_t *size = NULL) const;
status_t Unflatten(const char *flatBuffer);
status_t Unflatten(BDataIO *stream);
// Specifiers (scripting)
// Specifiers (scripting)
status_t AddSpecifier(const char *property);
status_t AddSpecifier(const char *property, int32 index);
status_t AddSpecifier(const char *property, int32 index, int32 range);
@ -116,7 +111,7 @@ class BMessage {
bool HasSpecifiers() const;
status_t PopSpecifier();
// Adding data
// Adding data
status_t AddRect(const char *name, BRect aRect);
status_t AddPoint(const char *name, BPoint aPoint);
status_t AddString(const char *name, const char *aString);
@ -138,12 +133,12 @@ class BMessage {
const void *data, ssize_t numBytes,
bool isFixedSize = true, int32 count = 1);
// Removing data
// Removing data
status_t RemoveData(const char *name, int32 index = 0);
status_t RemoveName(const char *name);
status_t MakeEmpty();
// Finding data
// Finding data
status_t FindRect(const char *name, BRect *rect) const;
status_t FindRect(const char *name, int32 index, BRect *rect) const;
status_t FindPoint(const char *name, BPoint *point) const;
@ -181,7 +176,7 @@ class BMessage {
status_t FindData(const char *name, type_code type, int32 index,
const void **data, ssize_t *numBytes) const;
// Replacing data
// Replacing data
status_t ReplaceRect(const char *name, BRect aRect);
status_t ReplaceRect(const char *name, int32 index, BRect aRect);
status_t ReplacePoint(const char *name, BPoint aPoint);
@ -223,7 +218,7 @@ class BMessage {
void *operator new(size_t, void *pointer);
void operator delete(void *pointer, size_t size);
// Private, reserved, or obsolete ----------------------------------------------
// Private, reserved, or obsolete
bool HasRect(const char *, int32 n = 0) const;
bool HasPoint(const char *, int32 n = 0) const;
bool HasString(const char *, int32 n = 0) const;
@ -303,8 +298,9 @@ class BMessage {
virtual void _ReservedMessage2();
virtual void _ReservedMessage3();
status_t _SendMessage(port_id port, int32 token, bigtime_t timeout,
bool replyRequired, BMessenger &replyTo) const;
status_t _SendMessage(port_id port, team_id portOwner, int32 token,
bigtime_t timeout, bool replyRequired,
BMessenger &replyTo) const;
status_t _SendMessage(port_id port, team_id portOwner,
int32 token, BMessage *reply, bigtime_t sendTimeout,
bigtime_t replyTimeout) const;

View File

@ -0,0 +1,38 @@
/*
* Copyright 2007, Haiku, Inc.
* Distributed under the terms of the MIT License.
*
* Authors:
* Axel Dörfler, axeld@pinc-software.de
*/
#ifndef _DIRECT_MESSAGE_TARGET_H
#define _DIRECT_MESSAGE_TARGET_H
#include <MessageQueue.h>
namespace BPrivate {
class BDirectMessageTarget {
public:
BDirectMessageTarget();
~BDirectMessageTarget();
bool AddMessage(BMessage* message);
void Close();
void Acquire();
void Release();
BMessageQueue* Queue() { return &fQueue; }
private:
int32 fReferenceCount;
BMessageQueue fQueue;
bool fClosed;
};
} // namespace BPrivate
#endif // _DIRECT_MESSAGE_TARGET_H

View File

@ -1,5 +1,5 @@
/*
* Copyright 2005, Haiku Inc. All rights reserved.
* Copyright 2005-2007, Haiku Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -200,10 +200,10 @@ class BMessage::Private {
}
status_t
SendMessage(port_id port, int32 token, bigtime_t timeout,
bool replyRequired, BMessenger &replyTo) const
SendMessage(port_id port, team_id portOwner, int32 token,
bigtime_t timeout, bool replyRequired, BMessenger &replyTo) const
{
return fMessage->_SendMessage(port, token,
return fMessage->_SendMessage(port, portOwner, token,
timeout, replyRequired, replyTo);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2001-2006, Haiku.
* Copyright 2001-2007, Haiku.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -10,13 +10,13 @@
#define _TOKEN_SPACE_H
#include <map>
#include <stack>
#include <BeBuild.h>
#include <Locker.h>
#include <SupportDefs.h>
#include <map>
#include <stack>
// token types as specified in targets
#define B_PREFERRED_TOKEN -2 /* A little bird told me about this one */
@ -30,6 +30,9 @@
namespace BPrivate {
class BDirectMessageTarget;
class BTokenSpace : public BLocker {
public:
BTokenSpace();
@ -40,12 +43,16 @@ class BTokenSpace : public BLocker {
bool RemoveToken(int32 token);
bool CheckToken(int32 token, int16 type) const;
status_t GetToken(int32 token, int16 type, void** object) const;
status_t GetToken(int32 token, int16 type, void** _object) const;
status_t SetHandlerTarget(int32 token, BDirectMessageTarget* target);
status_t AcquireHandlerTarget(int32 token, BDirectMessageTarget** _target);
private:
struct token_info {
int16 type;
void* object;
BDirectMessageTarget* target;
};
typedef std::map<int32, token_info> TokenMap;

View File

@ -0,0 +1,63 @@
/*
* Copyright 2007, Haiku, Inc.
* Distributed under the terms of the MIT License.
*
* Authors:
* Axel Dörfler, axeld@pinc-software.de
*/
#include <DirectMessageTarget.h>
namespace BPrivate {
BDirectMessageTarget::BDirectMessageTarget()
:
fReferenceCount(1),
fClosed(false)
{
}
BDirectMessageTarget::~BDirectMessageTarget()
{
}
bool
BDirectMessageTarget::AddMessage(BMessage* message)
{
if (fClosed) {
delete message;
return false;
}
fQueue.AddMessage(message);
return true;
}
void
BDirectMessageTarget::Close()
{
fClosed = true;
}
void
BDirectMessageTarget::Acquire()
{
atomic_add(&fReferenceCount, 1);
}
void
BDirectMessageTarget::Release()
{
if (atomic_add(&fReferenceCount, -1) == 1)
delete this;
}
} // namespace BPrivate

View File

@ -596,7 +596,8 @@ void
BHandler::SetLooper(BLooper *looper)
{
fLooper = looper;
gDefaultTokens.SetHandlerTarget(fToken, looper ? looper->fDirectTarget : NULL);
if (fFilters) {
for (int32 i = 0; i < fFilters->CountItems(); i++)
static_cast<BMessageFilter *>(fFilters->ItemAtFast(i))->SetLooper(looper);

View File

@ -29,12 +29,23 @@ initialize_before()
BMessage::Private::StaticInit();
BRoster::Private::InitBeRoster();
BPrivate::init_clipboard();
DBG(OUT("initialize_before() done\n"));
}
extern "C" void
initialize_after()
{
DBG(OUT("initialize_after()\n"));
BPrivate::init_clipboard();
// needs to send a message, and that requires gDefaultTokens to be initialized
DBG(OUT("initialize_after() done\n"));
}
extern "C" void
terminate_after()
{

View File

@ -32,6 +32,7 @@ MergeObject <libbe>app_kit.o :
Clipboard.cpp
dano_message.cpp
DesktopLink.cpp
DirectMessageTarget.cpp
Handler.cpp
InitTerminateLibBe.cpp
Invoker.cpp

View File

@ -12,6 +12,7 @@
/*! BLooper class spawns a thread that runs a message loop. */
#include <AppMisc.h>
#include <DirectMessageTarget.h>
#include <LooperList.h>
#include <MessagePrivate.h>
#include <ObjectLocker.h>
@ -130,8 +131,10 @@ BLooper::~BLooper()
// Clear the queue so our call to IsMessageWaiting() below doesn't give
// us bogus info
fDirectTarget->Close();
BMessage *message;
while ((message = fQueue->NextMessage()) != NULL) {
while ((message = fDirectTarget->Queue()->NextMessage()) != NULL) {
delete message;
// msg will automagically post generic reply
}
@ -141,7 +144,7 @@ BLooper::~BLooper()
// msg will automagically post generic reply
} while (IsMessageWaiting());
delete fQueue;
fDirectTarget->Release();
delete_port(fMsgPort);
// Clean up our filters
@ -294,7 +297,7 @@ BLooper::DetachCurrentMessage()
BMessageQueue*
BLooper::MessageQueue() const
{
return fQueue;
return fDirectTarget->Queue();
}
@ -303,7 +306,7 @@ BLooper::IsMessageWaiting() const
{
AssertLocked();
if (!fQueue->IsEmpty())
if (!fDirectTarget->Queue()->IsEmpty())
return true;
int32 count;
@ -933,7 +936,7 @@ BLooper::_InitData(const char *name, int32 priority, int32 portCapacity)
{
fOwner = B_ERROR;
fRunCalled = false;
fQueue = new BMessageQueue();
fDirectTarget = new (std::nothrow) BPrivate::BDirectMessageTarget();
fCommonFilters = NULL;
fLastMessage = NULL;
fPreferred = NULL;
@ -970,7 +973,7 @@ BLooper::AddMessage(BMessage* message)
// wakeup looper when being called from other threads if necessary
if (find_thread(NULL) != Thread()
&& fQueue->IsNextMessage(message) && port_count(fMsgPort) <= 0) {
&& fDirectTarget->Queue()->IsNextMessage(message) && port_count(fMsgPort) <= 0) {
// there is currently no message waiting, and we need to wakeup the looper
write_port_etc(fMsgPort, 0, NULL, 0, B_RELATIVE_TIMEOUT, 0);
}
@ -984,7 +987,7 @@ BLooper::_AddMessagePriv(BMessage* msg)
// Others may want to peek into our message queue, so the preferred
// handler must be set correctly already if no token was given
fQueue->AddMessage(msg);
fDirectTarget->Queue()->AddMessage(msg);
}
@ -1122,7 +1125,7 @@ BLooper::task_looper()
while (!fTerminating && dispatchNextMessage) {
PRINT(("LOOPER: inner loop\n"));
// Get next message from queue (assign to fLastMessage)
fLastMessage = fQueue->NextMessage();
fLastMessage = fDirectTarget->Queue()->NextMessage();
Lock();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2005-2006, Haiku Inc. All rights reserved.
* Copyright 2005-2007, Haiku Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -14,18 +14,21 @@
#include <MessagePrivate.h>
#include <MessageUtils.h>
#include <DirectMessageTarget.h>
#include <MessengerPrivate.h>
#include <TokenSpace.h>
#include <util/KMessage.h>
#include <Application.h>
#include <AppMisc.h>
#include <BlockCache.h>
#include <Entry.h>
#include <MessageQueue.h>
#include <Messenger.h>
#include <MessengerPrivate.h>
#include <Path.h>
#include <Point.h>
#include <Rect.h>
#include <String.h>
#include <TokenSpace.h>
#include <util/KMessage.h>
#include <ctype.h>
#include <malloc.h>
@ -1837,8 +1840,8 @@ BMessage::_StaticGetCachedReplyPort()
status_t
BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
bool replyRequired, BMessenger &replyTo) const
BMessage::_SendMessage(port_id port, team_id portOwner, int32 token,
bigtime_t timeout, bool replyRequired, BMessenger &replyTo) const
{
DEBUG_FUNCTION_ENTER;
ssize_t size = 0;
@ -1846,7 +1849,24 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
message_header *header = NULL;
status_t result;
if (/*fHeader->fields_size + fHeader->data_size > B_PAGE_SIZE*/false) {
BPrivate::BDirectMessageTarget* direct = NULL;
BMessage* copy = NULL;
if (portOwner == BPrivate::current_team())
BPrivate::gDefaultTokens.AcquireHandlerTarget(token, &direct);
if (direct != NULL) {
// We have a direct local message target - we can just enqueue the message
// in its message queue. This will also prevent possible deadlocks when the
// queue is full.
copy = new BMessage(*this);
if (copy != NULL) {
header = copy->fHeader;
result = B_OK;
} else {
direct->Release();
result = B_NO_MEMORY;
}
} else if (/*fHeader->fields_size + fHeader->data_size > B_PAGE_SIZE*/false) {
result = _FlattenToArea(&header);
buffer = (char *)header;
size = sizeof(message_header);
@ -1892,10 +1912,22 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
header->reply_target = replyToPrivate.Token();
header->flags |= MESSAGE_FLAG_WAS_DELIVERED;
do {
result = write_port_etc(port, kPortMessageCode, (void *)buffer, size,
B_RELATIVE_TIMEOUT, timeout);
} while (result == B_INTERRUPTED);
if (direct != NULL) {
// this is a local message transmission
direct->AddMessage(copy);
if (direct->Queue()->IsNextMessage(copy) && port_count(port) <= 0) {
// there is currently no message waiting, and we need to wakeup the looper
write_port_etc(port, 0, NULL, 0, B_RELATIVE_TIMEOUT, 0);
}
direct->Release();
} else {
do {
result = write_port_etc(port, kPortMessageCode, (void *)buffer, size,
B_RELATIVE_TIMEOUT, timeout);
} while (result == B_INTERRUPTED);
}
if (result == B_OK && IsSourceWaiting()) {
// the forwarded message will handle the reply - we must not do
@ -1908,6 +1940,9 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
}
/*!
Sends a message and waits synchronously for a reply.
*/
status_t
BMessage::_SendMessage(port_id port, team_id portOwner, int32 token,
BMessage *reply, bigtime_t sendTimeout, bigtime_t replyTimeout) const
@ -1961,7 +1996,9 @@ BMessage::_SendMessage(port_id port, team_id portOwner, int32 token,
BMessenger replyTarget;
BMessenger::Private(replyTarget).SetTo(team, replyPort,
B_PREFERRED_TOKEN);
result = _SendMessage(port, token, sendTimeout, true, replyTarget);
// TODO: replying could also use a BDirectMessageTarget like mechanism for local targets
result = _SendMessage(port, -1, token, sendTimeout, true,
replyTarget);
}
if (result < B_OK)

View File

@ -327,7 +327,7 @@ BMessenger::SendMessage(BMessage *message, BMessenger replyTo,
if (!message)
return B_BAD_VALUE;
return BMessage::Private(message).SendMessage(fPort, fHandlerToken,
return BMessage::Private(message).SendMessage(fPort, fTeam, fHandlerToken,
timeout, false, replyTo);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2001-2006, Haiku.
* Copyright 2001-2007, Haiku.
* Distributed under the terms of the MIT License.
*
* Authors:
@ -8,13 +8,11 @@
*/
#include <map>
#include <stack>
#include <DirectMessageTarget.h>
#include <TokenSpace.h>
#include <Autolock.h>
#include "TokenSpace.h"
namespace BPrivate {
@ -39,7 +37,7 @@ BTokenSpace::NewToken(int16 type, void* object)
{
BAutolock locker(this);
token_info tokenInfo = { type, object };
token_info tokenInfo = { type, object, NULL };
int32 token = fTokenCount++;
fTokenMap[token] = tokenInfo;
@ -59,7 +57,7 @@ BTokenSpace::SetToken(int32 token, int16 type, void* object)
{
BAutolock locker(this);
token_info tokenInfo = { type, object };
token_info tokenInfo = { type, object, NULL };
fTokenMap[token] = tokenInfo;
// this makes sure SetToken() plays more or less nice with NewToken()
@ -101,13 +99,12 @@ BTokenSpace::CheckToken(int32 token, int16 type) const
status_t
BTokenSpace::GetToken(int32 token, int16 type, void** _object) const
{
BAutolock locker(const_cast<BTokenSpace&>(*this));
if (token < 1)
return B_ENTRY_NOT_FOUND;
TokenMap::const_iterator iterator = fTokenMap.find(token);
BAutolock locker(const_cast<BTokenSpace&>(*this));
TokenMap::const_iterator iterator = fTokenMap.find(token);
if (iterator == fTokenMap.end() || iterator->second.type != type)
return B_ENTRY_NOT_FOUND;
@ -115,4 +112,47 @@ BTokenSpace::GetToken(int32 token, int16 type, void** _object) const
return B_OK;
}
status_t
BTokenSpace::SetHandlerTarget(int32 token, BDirectMessageTarget* target)
{
if (token < 1)
return B_ENTRY_NOT_FOUND;
BAutolock locker(const_cast<BTokenSpace&>(*this));
TokenMap::iterator iterator = fTokenMap.find(token);
if (iterator == fTokenMap.end() || iterator->second.type != B_HANDLER_TOKEN)
return B_ENTRY_NOT_FOUND;
if (iterator->second.target != NULL)
iterator->second.target->Release();
iterator->second.target = target;
if (target != NULL)
target->Acquire();
return B_OK;
}
status_t
BTokenSpace::AcquireHandlerTarget(int32 token, BDirectMessageTarget** _target)
{
if (token < 1)
return B_ENTRY_NOT_FOUND;
BAutolock locker(const_cast<BTokenSpace&>(*this));
TokenMap::const_iterator iterator = fTokenMap.find(token);
if (iterator == fTokenMap.end() || iterator->second.type != B_HANDLER_TOKEN)
return B_ENTRY_NOT_FOUND;
if (iterator->second.target != NULL)
iterator->second.target->Acquire();
*_target = iterator->second.target;
return B_OK;
}
} // namespace BPrivate

View File

@ -25,6 +25,7 @@
#include <input_globals.h>
#include <AppMisc.h>
#include <ApplicationPrivate.h>
#include <DirectMessageTarget.h>
#include <InputServerTypes.h>
#include <MenuPrivate.h>
#include <MessagePrivate.h>
@ -2488,7 +2489,7 @@ BWindow::_DequeueAll()
for (int32 i = 0; i < count; i++) {
BMessage *message = MessageFromPort(0);
if (message != NULL)
fQueue->AddMessage(message);
fDirectTarget->Queue()->AddMessage(message);
}
}
@ -2536,8 +2537,8 @@ BWindow::task_looper()
bool dispatchNextMessage = true;
while (!fTerminating && dispatchNextMessage) {
// Get next message from queue (assign to fLastMessage)
fLastMessage = fQueue->NextMessage();
// Get next message from queue (assign to fLastMessage)
fLastMessage = fDirectTarget->Queue()->NextMessage();
// Lock the looper
if (!Lock())