Minor tweak to BHandler::UnlockLooper()

Added calls to _init_message_(), _delete_message_(), and
_msg_cache_cleanup() to InitTerminateLibBe.cpp
Finished first implementation of BMessage::SendReply(), BMessage::_send_(),
and BMessage::_send_message()
Add BMessage to app.src, removed BBlockCache from support.src.
New BMessage::Private class has functions for twiddling BMessage internals


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@4371 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
ejakowatz 2003-08-25 07:55:52 +00:00
parent 5c7a44d436
commit e9e500cb9e
7 changed files with 512 additions and 36 deletions

View File

@ -292,11 +292,14 @@ virtual ~BMessage();
float FindFloat(const char *, int32 n = 0) const;
double FindDouble(const char *, int32 n = 0) const;
class Private;
private:
friend class BMessageQueue;
friend class BMessenger;
friend class BApplication;
friend class Private;
friend void _msg_cache_cleanup_();
friend BMessage *_reconstruct_msg_(uint32,uint32,uint32);

View File

@ -0,0 +1,64 @@
//------------------------------------------------------------------------------
// MessagePrivate.h
//
//------------------------------------------------------------------------------
#ifndef MESSAGEPRIVATE_H
#define MESSAGEPRIVATE_H
// Standard Includes -----------------------------------------------------------
// System Includes -------------------------------------------------------------
// Project Includes ------------------------------------------------------------
// Local Includes --------------------------------------------------------------
// Local Defines ---------------------------------------------------------------
// Globals ---------------------------------------------------------------------
extern "C" void _msg_cache_cleanup_();
extern "C" int _init_message_();
extern "C" int _delete_message_();
class BMessage::Private
{
public:
Private(BMessage* msg) : fMessage(msg) {;}
Private(BMessage& msg) : fMessage(&msg) {;}
inline void SetTarget(int32 token, bool preferred)
{
fMessage->fTarget = token;
fMessage->fPreferred = preferred;
}
inline void SetReply(BMessenger messenger)
{
fMessage->fReplyTo.port = messenger.fPort;
fMessage->fReplyTo.target = messenger.fHandlerToken;
fMessage->fReplyTo.team = messenger.fTeam;
fMessage->fReplyTo.preferred = messenger.fPreferredTarget;
}
inline int32 GetTarget()
{
return fMessage->fTarget;
}
inline bool UsePreferredTarget()
{
return fMessage->fPreferred;
}
private:
BMessage* fMessage;
};
#endif // MESSAGEPRIVATE_H
/*
* $Log $
*
* $Id $
*
*/

View File

@ -616,9 +616,10 @@ void BHandler::UnlockLooper()
re-Lock()ing the original looper just doesn't seem right.
*/
// TODO: implement correctly
if (fLooper)
BLooper* Looper = fLooper;
if (Looper)
{
fLooper->Unlock();
Looper->Unlock();
}
}
//------------------------------------------------------------------------------

View File

@ -25,6 +25,7 @@
//------------------------------------------------------------------------------
#include <stdio.h>
#include <MessagePrivate.h>
#include <RosterPrivate.h>
// debugging
@ -39,6 +40,7 @@ initialize_before()
{
DBG(OUT("initialize_before()\n"));
_init_message_();
_init_roster_();
DBG(OUT("initialize_before() done\n"));
@ -52,6 +54,8 @@ terminate_after()
DBG(OUT("terminate_after()\n"));
_delete_roster_();
_delete_message_();
_msg_cache_cleanup_();
DBG(OUT("terminate_after() done\n"));
}

View File

@ -34,6 +34,8 @@
#include <stdio.h>
// System Includes -------------------------------------------------------------
#include <Application.h>
#include <BlockCache.h>
#include <ByteOrder.h>
#include <Errors.h>
#include <Message.h>
@ -83,11 +85,48 @@ const char* B_SPECIFIER_ENTRY = "specifiers";
const char* B_PROPERTY_ENTRY = "property";
const char* B_PROPERTY_NAME_ENTRY = "name";
BBlockCache* BMessage::sMsgCache = NULL;
port_id BMessage::sReplyPorts[sNumReplyPorts];
long BMessage::sReplyPortInUse[sNumReplyPorts];
static status_t handle_reply(port_id reply_port,
int32* pCode,
bigtime_t timeout,
BMessage* reply);
//------------------------------------------------------------------------------
extern "C" {
void _msg_cache_cleanup_()
{
delete BMessage::sMsgCache;
BMessage::sMsgCache = NULL;
}
//------------------------------------------------------------------------------
int _init_message_()
{
BMessage::sReplyPorts[0] = create_port(1, "tmp_rport0");
BMessage::sReplyPorts[1] = create_port(1, "tmp_rport1");
BMessage::sReplyPorts[2] = create_port(1, "tmp_rport2");
BMessage::sReplyPortInUse[0] = 0;
BMessage::sReplyPortInUse[1] = 0;
BMessage::sReplyPortInUse[2] = 0;
return 0;
}
//------------------------------------------------------------------------------
int _delete_message_()
{
delete_port(BMessage::sReplyPorts[0]);
BMessage::sReplyPorts[0] = NULL;
delete_port(BMessage::sReplyPorts[1]);
BMessage::sReplyPorts[1] = NULL;
delete_port(BMessage::sReplyPorts[2]);
BMessage::sReplyPorts[2] = NULL;
return 0;
}
} // extern "C"
//------------------------------------------------------------------------------
BMessage *_reconstruct_msg_(uint32,uint32,uint32)
{
return NULL;
@ -95,6 +134,10 @@ BMessage *_reconstruct_msg_(uint32,uint32,uint32)
//------------------------------------------------------------------------------
#ifdef USING_TEMPLATE_MADNESS
void BMessage::_ReservedMessage1() {}
void BMessage::_ReservedMessage2() {}
void BMessage::_ReservedMessage3() {}
//------------------------------------------------------------------------------
BMessage::BMessage()
: what(0)
@ -181,7 +224,14 @@ void BMessage::init_data()
fReadOnly = false;
fHasSpecifiers = false;
fBody = new BPrivate::BMessageBody;
if (fBody)
{
fBody->MakeEmpty();
}
else
{
fBody = new BPrivate::BMessageBody;
}
}
//------------------------------------------------------------------------------
status_t BMessage::GetInfo(type_code typeRequested, int32 which, char** name,
@ -315,15 +365,120 @@ status_t BMessage::SendReply(uint32 command, BHandler* reply_to)
status_t BMessage::SendReply(BMessage* the_reply, BHandler* reply_to,
bigtime_t timeout)
{
BMessenger messenger(fReplyTo.team, fReplyTo.port, fReplyTo.target,
fReplyTo.preferred);
BMessenger messenger(reply_to);
return SendReply(the_reply, messenger, timeout);
}
//------------------------------------------------------------------------------
#if 0
template<class Sender>
status_t SendReplyHelper(BMessage* the_message, BMessage* the_reply,
Sender& the_sender)
{
BMessenger messenger(the_message->fReplyTo.team, the_message->fReplyTo.port,
the_message->fReplyTo.target,
the_message->fReplyTo.preferred);
if (the_message->fReplyRequired)
{
if (the_message->fReplyDone)
{
return B_DUPLICATE_REPLY;
}
the_message->fReplyDone = true;
the_reply->fIsReply = true;
status_t err = the_sender.Send(messenger, the_reply);
the_reply->fIsReply = false;
if (err)
{
if (set_port_owner(messenger.fPort, messenger.fTeam) == B_BAD_TEAM_ID)
{
delete_port(messenger.fPort);
}
}
return err;
}
// no reply required
if (!the_message->fWasDelivered)
{
return B_BAD_REPLY;
}
#if 0
char tmp[0x800];
ssize_t size;
char* p = stack_flatten(tmp, sizeof(tmp), true /* include reply */, &size);
the_reply->AddData("_previous_", B_RAW_TYPE, p ? p : tmp, &size);
if (p)
{
free(p);
}
#endif
the_reply->AddMessage("_previous_", the_message);
the_reply->fIsReply = true;
status_t err = the_sender.Send(messenger, the_reply);
the_reply->fIsReply = false;
the_reply->RemoveName("_previous_");
return err;
};
#endif
//------------------------------------------------------------------------------
#if 0
struct Sender1
{
BMessenger& reply_to;
bigtime_t timeout;
Sender1(BMessenger& m, bigtime_t t) : reply_to(m), timeout(t) {;}
status_t Send(BMessenger& messenger, BMessage* the_reply)
{
return messenger.SendMessage(the_reply, reply_to, timeout);
}
};
status_t BMessage::SendReply(BMessage* the_reply, BMessenger reply_to,
bigtime_t timeout)
{
// TODO: implement *
Sender1 mySender(reply_to, timeout);
return SendReplyHelper(this, the_reply, mySender);
}
#endif
status_t BMessage::SendReply(BMessage* the_reply, BMessenger reply_to,
bigtime_t timeout)
{
// TODO: test
BMessenger messenger(fReplyTo.team, fReplyTo.port,
fReplyTo.target,
fReplyTo.preferred);
if (fReplyRequired)
{
if (fReplyDone)
{
return B_DUPLICATE_REPLY;
}
fReplyDone = true;
the_reply->fIsReply = true;
status_t err = messenger.SendMessage(the_reply, reply_to, timeout);
the_reply->fIsReply = false;
if (err)
{
if (set_port_owner(messenger.fPort, messenger.fTeam) == B_BAD_TEAM_ID)
{
delete_port(messenger.fPort);
}
}
return err;
}
// no reply required
if (!fWasDelivered)
{
return B_BAD_REPLY;
}
the_reply->AddMessage("_previous_", this);
the_reply->fIsReply = true;
status_t err = messenger.SendMessage(the_reply, reply_to, timeout);
the_reply->fIsReply = false;
the_reply->RemoveName("_previous_");
return err;
}
//------------------------------------------------------------------------------
status_t BMessage::SendReply(uint32 command, BMessage* reply_to_reply)
@ -332,10 +487,69 @@ status_t BMessage::SendReply(uint32 command, BMessage* reply_to_reply)
return SendReply(&msg, reply_to_reply);
}
//------------------------------------------------------------------------------
#if 0
struct Sender2
{
BMessage* reply_to_reply;
bigtime_t send_timeout;
bigtime_t reply_timeout;
Sender2(BMessage* m, bigtime_t t1, bigtime_t t2)
: reply_to_reply(m), send_timeout(t1), reply_timeout(t2) {;}
status_t Send(BMessenger& messenger, BMessage* the_reply)
{
return messenger.SendMessage(the_reply, reply_to_reply,
send_timeout, reply_timeout);
}
};
status_t BMessage::SendReply(BMessage* the_reply, BMessage* reply_to_reply,
bigtime_t send_timeout, bigtime_t reply_timeout)
{
// TODO: implement *
Sender2 mySender(reply_to_reply, send_timeout, reply_timeout);
return SendReplyHelper(this, the_reply, mySender);
}
#endif
status_t BMessage::SendReply(BMessage* the_reply, BMessage* reply_to_reply,
bigtime_t send_timeout, bigtime_t reply_timeout)
{
// TODO: test
BMessenger messenger(fReplyTo.team, fReplyTo.port,
fReplyTo.target,
fReplyTo.preferred);
if (fReplyRequired)
{
if (fReplyDone)
{
return B_DUPLICATE_REPLY;
}
fReplyDone = true;
the_reply->fIsReply = true;
status_t err = messenger.SendMessage(the_reply, reply_to_reply,
send_timeout, reply_timeout);
the_reply->fIsReply = false;
if (err)
{
if (set_port_owner(messenger.fPort, messenger.fTeam) == B_BAD_TEAM_ID)
{
delete_port(messenger.fPort);
}
}
return err;
}
// no reply required
if (!fWasDelivered)
{
return B_BAD_REPLY;
}
the_reply->AddMessage("_previous_", this);
the_reply->fIsReply = true;
status_t err = messenger.SendMessage(the_reply, reply_to_reply,
send_timeout, reply_timeout);
the_reply->fIsReply = false;
the_reply->RemoveName("_previous_");
return err;
}
//------------------------------------------------------------------------------
ssize_t BMessage::FlattenedSize() const
@ -740,9 +954,6 @@ status_t BMessage::AddMessenger(const char* name, BMessenger messenger)
//------------------------------------------------------------------------------
status_t BMessage::AddRef(const char* name, const entry_ref* ref)
{
#if 0
return fBody->AddData<entry_ref>(name, *ref, B_REF_TYPE);
#endif
char* buffer = new(nothrow) char[sizeof (entry_ref) + B_PATH_NAME_LENGTH];
size_t size;
status_t err = entry_ref_flatten(buffer, &size, ref);
@ -757,9 +968,6 @@ status_t BMessage::AddRef(const char* name, const entry_ref* ref)
//------------------------------------------------------------------------------
status_t BMessage::AddMessage(const char* name, const BMessage* msg)
{
#if 0
return fBody->AddData<BMessage>(name, *msg, B_MESSAGE_TYPE);
#endif
status_t err = B_OK;
ssize_t size = msg->FlattenedSize();
char* buffer = new(nothrow) char[size];
@ -859,17 +1067,12 @@ status_t BMessage::AddData(const char* name, type_code type, const void* data,
break;
case B_REF_TYPE:
{
// err = AddRef(name, (entry_ref*)data);
BDataBuffer DB((void*)data, numBytes, true);
err = fBody->AddData<BDataBuffer>(name, DB, type);
break;
}
case B_MESSAGE_TYPE:
{
// BMessage msg;
// msg.Unflatten((const char*)data);
// err = AddMessage(name, &msg);
// err = AddMessage(name, (BMessage*)data);
BDataBuffer DB((void*)data, numBytes, true);
err = fBody->AddData<BDataBuffer>(name, DB, type);
break;
@ -967,9 +1170,6 @@ status_t BMessage::FindRef(const char* name, entry_ref* ref) const
//------------------------------------------------------------------------------
status_t BMessage::FindRef(const char* name, int32 index, entry_ref* ref) const
{
#if 0
return fBody->FindData<entry_ref>(name, index, ref, B_REF_TYPE);
#endif
void* data = NULL;
ssize_t size = 0;
status_t err = FindData(name, B_REF_TYPE, index, (const void**)&data, &size);
@ -988,9 +1188,6 @@ status_t BMessage::FindMessage(const char* name, BMessage* msg) const
//------------------------------------------------------------------------------
status_t BMessage::FindMessage(const char* name, int32 index, BMessage* msg) const
{
#if 0
return fBody->FindData<BMessage>(name, index, msg, B_MESSAGE_TYPE);
#endif
void* data = NULL;
ssize_t size = 0;
status_t err = FindData(name, B_MESSAGE_TYPE, index,
@ -1091,9 +1288,6 @@ status_t BMessage::ReplaceRef(const char* name, int32 index, const entry_ref* re
{
// TODO: test
// Use voidref's theoretical BDataBuffer
#if 0
return fBody->ReplaceData<entry_ref>(name, index, *ref, B_REF_TYPE);
#endif
char* buffer = new(nothrow) char[sizeof (entry_ref) + B_PATH_NAME_LENGTH];
size_t size;
status_t err = entry_ref_flatten(buffer, &size, ref);
@ -1114,9 +1308,6 @@ status_t BMessage::ReplaceMessage(const char* name, const BMessage* msg)
status_t BMessage::ReplaceMessage(const char* name, int32 index,
const BMessage* msg)
{
#if 0
return fBody->ReplaceData<BMessage>(name, index, *msg, B_MESSAGE_TYPE);
#endif
status_t err = B_OK;
ssize_t size = msg->FlattenedSize();
char* buffer = new(nothrow) char[size];
@ -1231,7 +1422,11 @@ status_t BMessage::ReplaceData(const char* name, type_code type, int32 index,
//------------------------------------------------------------------------------
void* BMessage::operator new(size_t size)
{
return ::new char[size];
if (!sMsgCache)
{
sMsgCache = new BBlockCache(10, size, B_OBJECT_CACHE);
}
return sMsgCache->Get(size);
}
//------------------------------------------------------------------------------
void* BMessage::operator new(size_t, void* p)
@ -1241,7 +1436,7 @@ void* BMessage::operator new(size_t, void* p)
//------------------------------------------------------------------------------
void BMessage::operator delete(void* ptr, size_t size)
{
::delete(ptr);
sMsgCache->Save(ptr, size);
}
//------------------------------------------------------------------------------
bool BMessage::HasFlat(const char* name, const BFlattenable* flat) const
@ -1502,6 +1697,24 @@ status_t BMessage::real_flatten(BDataIO* stream) const
return err;
}
//------------------------------------------------------------------------------
char* BMessage::stack_flatten(char* stack_ptr, ssize_t stack_size,
bool /*incl_reply*/, ssize_t* size) const
{
const ssize_t calcd_size = calc_hdr_size(0) + fBody->FlattenedSize();
char* new_ptr = NULL;
if (calcd_size > stack_size)
{
stack_ptr = new char[calcd_size];
new_ptr = stack_ptr;
}
real_flatten(stack_ptr, calcd_size);
if (size)
{
*size = calcd_size;
}
return new_ptr;
}
//------------------------------------------------------------------------------
ssize_t BMessage::calc_hdr_size(uchar flags) const
{
ssize_t size = min_hdr_size();
@ -1538,6 +1751,197 @@ ssize_t BMessage::min_hdr_size() const
return size;
}
//------------------------------------------------------------------------------
status_t BMessage::_send_(port_id port, int32 token, bool preferred,
bigtime_t timeout, bool reply_required,
BMessenger& reply_to) const
{
BMessage tmp_msg;
tmp_msg.fPreferred = fPreferred;
tmp_msg.fTarget = fTarget;
tmp_msg.fReplyRequired = fReplyRequired;
tmp_msg.fReplyTo = fReplyTo;
BMessage* self = const_cast<BMessage*>(this);
self->fPreferred = preferred;
self->fTarget = token;
self->fReplyRequired = reply_required;
self->fReplyTo.team = reply_to.fTeam;
self->fReplyTo.port = reply_to.fPort;
self->fReplyTo.target = reply_to.fHandlerToken;
self->fReplyTo.preferred = reply_to.fPreferredTarget;
char tmp[0x800];
ssize_t size;
char* p = stack_flatten(tmp, sizeof(tmp), true /* include reply */, &size);
char* pMem = p ? p : tmp;
status_t err;
do
{
err = write_port_etc(port, 'pjpp', pMem, size, 8, timeout);
} while (err == B_INTERRUPTED);
if (p)
{
delete[] p;
}
self->fPreferred = tmp_msg.fPreferred;
self->fTarget = tmp_msg.fTarget;
self->fReplyRequired = tmp_msg.fReplyRequired;
self->fReplyTo = tmp_msg.fReplyTo;
tmp_msg.init_data();
return err;
}
//------------------------------------------------------------------------------
status_t BMessage::send_message(port_id port, team_id port_owner, int32 token,
bool preferred, BMessage* reply,
bigtime_t send_timeout,
bigtime_t reply_timeout) const
{
const int32 cached_reply_port = sGetCachedReplyPort();
port_id reply_port;
status_t err;
if (cached_reply_port == -1)
{
// All the cached reply ports are in use; create a new one
reply_port = create_port(1 /* for one message */, "tmp_reply_port");
if (reply_port < 0)
{
return reply_port;
}
}
else
{
assert(cached_reply_port < sNumReplyPorts);
reply_port = sReplyPorts[cached_reply_port];
}
team_id team;
if (be_app)
{
team = be_app->Team();
}
else
{
port_info pi;
err = get_port_info(reply_port, &pi);
if (err)
{
goto error;
}
team = pi.team;
}
err = set_port_owner(reply_port, port_owner);
if (err)
{
goto error;
}
{
BMessenger messenger(team, reply_port, -1, false);
err = _send_(port, token, preferred, send_timeout, true, messenger);
}
if (err)
{
goto error;
}
int32 code;
err = handle_reply(reply_port, &code, reply_timeout, reply);
if (err && cached_reply_port >= 0)
{
delete_port(reply_port);
sReplyPorts[cached_reply_port] = create_port(1, "tmp_rport");
}
error:
if (cached_reply_port >= 0)
{
// Reclaim ownership of cached port
set_port_owner(reply_port, team);
// Flag as available
atomic_add(&sReplyPortInUse[cached_reply_port], -1);
return err;
}
delete_port(reply_port);
return err;
}
//------------------------------------------------------------------------------
int32 BMessage::sGetCachedReplyPort()
{
int index = -1;
for (int32 i = 0; i < sNumReplyPorts; i++)
{
int32 old = atomic_add(&(sReplyPortInUse[i]), 1);
if (old == 0)
{
// This entry is free
index = i;
break;
}
else
{
// This entry is being used.
atomic_add(&(sReplyPortInUse[i]), -1);
}
}
return index;
}
//------------------------------------------------------------------------------
static status_t handle_reply(port_id reply_port,
int32* pCode,
bigtime_t timeout,
BMessage* reply)
{
status_t err;
do
{
err = port_buffer_size_etc(reply_port, 8, timeout);
} while (err == B_INTERRUPTED);
if (err < 0)
{
return err;
}
// The API lied. It really isn't an error code, but the message size...
char* pAllocd = NULL;
char* pMem = NULL;
char tmp[0x800];
if (err < 0x800)
{
pMem = tmp;
}
else
{
pAllocd = new char[0x800];
pMem = pAllocd;
}
do
{
err = read_port(reply_port, pCode, pMem, err);
} while (err == B_INTERRUPTED);
if (err < 0)
{
return err;
}
if (*pCode = 'PUSH')
{
return B_ERROR;
}
if (*pCode != 'pjpp')
{
return B_OK;
}
err = reply->Unflatten(pMem);
// There seems to be a bug in the original Be implementation.
// It never free'd pAllocd !
if (pAllocd)
{
delete[] pAllocd;
}
return err;
}
//------------------------------------------------------------------------------
#else // USING_TEMPLATE_MADNESS
@ -2690,8 +3094,8 @@ status_t BMessage::real_flatten(BDataIO *stream, ssize_t size,
}
#endif
//------------------------------------------------------------------------------
char *BMessage::stack_flatten(char *stack_ptr, ssize_t stack_size,
bool incl_reply, ssize_t *size) const
char* BMessage::stack_flatten(char* stack_ptr, ssize_t stack_size,
bool incl_reply, ssize_t* size) const
{
return NULL;
}

View File

@ -11,7 +11,7 @@ APP_KIT_SOURCE =
Invoker.cpp
Looper.cpp
LooperList.cpp
# Message.cpp
Message.cpp
MessageBody.cpp
MessageField.cpp
MessageFilter.cpp

View File

@ -1,6 +1,6 @@
SUPPORT_KIT_SOURCE =
Archivable.cpp
BlockCache.cpp
# BlockCache.cpp
DataIO.cpp
BufferIO.cpp
Flattenable.cpp