* Fixed some more bugs in the message passing by area

* Reordered some functions
* Area messages can now be unflattened instead of using the private _Reference and a special port code

Passing by area is now mostly working but it's not yet enabled. I will have to conduct performance tests first to see if and starting at what messagesize the overhead is reasonable.

git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@16500 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Michael Lotz 2006-02-23 09:02:03 +00:00
parent 43d0f55b38
commit 6990418519
4 changed files with 235 additions and 262 deletions

View File

@ -291,13 +291,14 @@ class BMessage {
message_header* fHeader;
field_header* fFields;
uint8* fData;
area_id fClonedArea;
mutable BMessage* fOriginal;
BMessage* fQueueLink;
// fQueueLink is used by BMessageQueue to build a linked list
uint32 fReserved[11];
uint32 fReserved[10];
// deprecated
BMessage(BMessage *message);

View File

@ -20,8 +20,7 @@
#define MAX_ITEM_PREALLOCATION B_PAGE_SIZE
static const int32 kPortMessageCodeDirect = 'pjpp';
static const int32 kPortMessageCodeByArea = 'pjp2';
static const int32 kPortMessageCode = 'pjpp';
enum {
@ -31,7 +30,8 @@ enum {
MESSAGE_FLAG_IS_REPLY = 0x0008,
MESSAGE_FLAG_WAS_DELIVERED = 0x0010,
MESSAGE_FLAG_HAS_SPECIFIERS = 0x0020,
MESSAGE_FLAG_WAS_DROPPED = 0x0080
MESSAGE_FLAG_WAS_DROPPED = 0x0080,
MESSAGE_FLAG_PASS_BY_AREA = 0x0100
};
@ -182,12 +182,6 @@ class BMessage::Private {
return fMessage->_FlattenToArea(header);
}
status_t
Reference(message_header *header)
{
return fMessage->_Reference(header);
}
status_t
SendMessage(port_id port, int32 token, bigtime_t timeout,
bool replyRequired, BMessenger &replyTo) const

View File

@ -1171,19 +1171,6 @@ BLooper::ConvertToMessage(void *buffer, int32 code)
return NULL;
BMessage *message = new BMessage();
#ifdef USING_MESSAGE4
if (code == kPortMessageCodeByArea) {
BMessage::Private messagePrivate(message);
if (messagePrivate.Reference((BMessage::message_header *)buffer) != B_OK) {
PRINT(("BLooper::ConvertToMessage(): referencing message failed\n"));
delete message;
message = NULL;
}
return message;
}
#endif
if (message->Unflatten((const char *)buffer) != B_OK) {
PRINT(("BLooper::ConvertToMessage(): unflattening message failed\n"));
delete message;

View File

@ -161,6 +161,8 @@ BMessage::_InitCommon()
fFields = NULL;
fData = NULL;
fClonedArea = -1;
fOriginal = NULL;
fQueueLink = NULL;
return B_OK;
@ -196,7 +198,7 @@ status_t
BMessage::_Clear()
{
DEBUG_FUNCTION_ENTER;
if (fHeader && fHeader->shared_area >= B_OK)
if (fClonedArea >= B_OK)
_Dereference();
free(fHeader);
@ -532,7 +534,7 @@ BMessage::Rename(const char *oldEntry, const char *newEntry)
if (!oldEntry || !newEntry)
return B_BAD_VALUE;
if (fHeader->shared_area >= B_OK)
if (fClonedArea >= B_OK)
_CopyForWrite();
uint32 hash = _HashName(oldEntry) % fHeader->hash_table_size;
@ -861,6 +863,155 @@ BMessage::_NativeFlatten(BDataIO *stream, ssize_t *size) const
}
/* The concept of message sending by area:
The traditional way of sending a message is to send it by flattening it to
a buffer, pushing it through a port, reading it into the outputbuffer and
unflattening it from there (copying the data again). While this works ok
for small messages it does not make any sense for larger ones and may even
hit some port capacity limit.
Often in the life of a BMessage, it will be sent to someone. Almost as
often the one receiving the message will not need to change the message
in any way, but uses it "read only" to get information from it. This means
that all that copying is pretty pointless in the first place since we
could simply pass the original buffers on.
It's obviously not exactly as simple as this, since we cannot just use the
memory of one application in another - but we can share areas with
eachother.
Therefore instead of flattening into a buffer, we copy the message data
into an area, put this information into the message header and only push
this through the port. The receiving looper then builds a BMessage from
the header, that only references the data in the area (not copying it),
allowing read only access to it.
Only if write access is necessary the message will be copyed from the area
to its own buffers (like in the unflatten step before).
The double copying is reduced to a single copy in most cases and we safe
the slower route of moving the data through a port.
Additionally we save us the reference counting with the use of areas that
are reference counted internally. So we don't have to worry about leaving
an area behind or deleting one that is still in use.
*/
status_t
BMessage::_FlattenToArea(message_header **_header) const
{
DEBUG_FUNCTION_ENTER;
message_header *header = (message_header *)malloc(sizeof(message_header));
memcpy(header, fHeader, sizeof(message_header));
header->what = what;
header->fields_available = 0;
header->data_available = 0;
header->flags |= MESSAGE_FLAG_PASS_BY_AREA;
*_header = header;
if (header->shared_area >= B_OK)
return B_OK;
if (header->fields_size == 0 && header->data_size == 0)
return B_OK;
uint8 *address = NULL;
ssize_t size = header->fields_size + header->data_size;
size = (size + B_PAGE_SIZE) & ~(B_PAGE_SIZE - 1);
area_id area = create_area("Shared BMessage data", (void **)&address,
B_ANY_ADDRESS, size, B_NO_LOCK, B_READ_AREA | B_WRITE_AREA);
if (area < B_OK) {
free(header);
*_header = NULL;
return area;
}
if (header->fields_size > 0) {
memcpy(address, fFields, header->fields_size);
header->fields_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->fields_size);
address += header->fields_size;
}
if (header->data_size > 0) {
memcpy(address, fData, header->data_size);
header->data_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->data_size);
}
header->shared_area = area;
return B_OK;
}
status_t
BMessage::_Reference(message_header *header)
{
DEBUG_FUNCTION_ENTER;
fHeader = header;
fHeader->flags &= ~MESSAGE_FLAG_PASS_BY_AREA;
/* if there is no data at all we don't need the area */
if (fHeader->fields_size == 0 && header->data_size == 0)
return B_OK;
uint8 *address = NULL;
area_id clone = clone_area("Cloned BMessage data", (void **)&address,
B_ANY_ADDRESS, B_READ_AREA, fHeader->shared_area);
if (clone < B_OK) {
free(fHeader);
fHeader = NULL;
_InitHeader();
return clone;
}
fClonedArea = clone;
fFields = (field_header *)address;
address += fHeader->fields_size;
fData = address;
return B_OK;
}
status_t
BMessage::_Dereference()
{
DEBUG_FUNCTION_ENTER;
delete_area(fClonedArea);
fClonedArea = -1;
fFields = NULL;
fData = NULL;
return B_OK;
}
status_t
BMessage::_CopyForWrite()
{
DEBUG_FUNCTION_ENTER;
if (fClonedArea < B_OK)
return B_OK;
field_header *newFields = NULL;
uint8 *newData = NULL;
if (fHeader->fields_size > 0) {
newFields = (field_header *)malloc(fHeader->fields_size);
memcpy(newFields, fFields, fHeader->fields_size);
}
if (fHeader->data_size > 0) {
newData = (uint8 *)malloc(fHeader->data_size);
memcpy(newData, fData, fHeader->data_size);
}
_Dereference();
fHeader->fields_available = 0;
fHeader->data_available = 0;
fFields = newFields;
fData = newData;
return B_OK;
}
status_t
BMessage::Unflatten(const char *flatBuffer)
{
@ -903,38 +1054,46 @@ BMessage::Unflatten(const char *flatBuffer)
memcpy(fHeader, flatBuffer, sizeof(message_header));
flatBuffer += sizeof(message_header);
fHeader->shared_area = -1;
fHeader->fields_available = 0;
fHeader->data_available = 0;
what = fHeader->what;
if (fHeader->format != kMessageMagic4 ||
!(fHeader->flags & MESSAGE_FLAG_VALID)) {
_Clear();
if (fHeader->format != kMessageMagic4
|| !(fHeader->flags & MESSAGE_FLAG_VALID)) {
free(fHeader);
fHeader = NULL;
_InitHeader();
return B_BAD_VALUE;
}
if (fHeader->fields_size > 0) {
fFields = (field_header *)malloc(fHeader->fields_size);
if (!fFields)
return B_NO_MEMORY;
fHeader->fields_available = 0;
fHeader->data_available = 0;
what = fHeader->what;
memcpy(fFields, flatBuffer, fHeader->fields_size);
flatBuffer += fHeader->fields_size;
}
if (fHeader->flags & MESSAGE_FLAG_PASS_BY_AREA) {
status_t result = _Reference(fHeader);
if (result < B_OK)
return result;
} else {
fHeader->shared_area = -1;
if (fHeader->data_size > 0) {
fData = (uint8 *)malloc(fHeader->data_size);
if (!fData)
return B_NO_MEMORY;
if (fHeader->fields_size > 0) {
fFields = (field_header *)malloc(fHeader->fields_size);
if (!fFields)
return B_NO_MEMORY;
memcpy(fData, flatBuffer, fHeader->data_size);
memcpy(fFields, flatBuffer, fHeader->fields_size);
flatBuffer += fHeader->fields_size;
}
if (fHeader->data_size > 0) {
fData = (uint8 *)malloc(fHeader->data_size);
if (!fData)
return B_NO_MEMORY;
memcpy(fData, flatBuffer, fHeader->data_size);
}
}
if (fHeader->fields_checksum != BPrivate::CalculateChecksum((uint8 *)fFields, fHeader->fields_size)
|| fHeader->data_checksum != BPrivate::CalculateChecksum((uint8 *)fData, fHeader->data_size)) {
debug_printf("checksum mismatch\n");
debug_printf("checksum mismatch 2\n");
_Clear();
_InitHeader();
return B_BAD_VALUE;
@ -975,54 +1134,52 @@ BMessage::Unflatten(BDataIO *stream)
uint8 *header = (uint8 *)fHeader;
ssize_t result = stream->Read(header + sizeof(uint32),
sizeof(message_header) - sizeof(uint32));
result -= sizeof(message_header) - sizeof(uint32);
fHeader->shared_area = -1;
fHeader->fields_available = 0;
fHeader->data_available = 0;
what = fHeader->what;
if (result != sizeof(message_header) - sizeof(uint32)) {
_Clear();
_InitHeader();
return (result >= 0 ? B_ERROR : result);
}
if (fHeader->format != kMessageMagic4 ||
!(fHeader->flags & MESSAGE_FLAG_VALID)) {
_Clear();
if (result != B_OK || fHeader->format != kMessageMagic4
|| !(fHeader->flags & MESSAGE_FLAG_VALID)) {
free(fHeader);
fHeader = NULL;
_InitHeader();
return B_BAD_VALUE;
}
if (fHeader->fields_size > 0) {
fFields = (field_header *)malloc(fHeader->fields_size);
if (!fFields)
return B_NO_MEMORY;
fHeader->fields_available = 0;
fHeader->data_available = 0;
what = fHeader->what;
result = stream->Read(fFields, fHeader->fields_size);
if (result != fHeader->fields_size) {
_Clear();
_InitHeader();
return (result >= 0 ? B_ERROR : result);
if (fHeader->flags & MESSAGE_FLAG_PASS_BY_AREA) {
result = _Reference(fHeader);
if (result < B_OK)
return result;
} else {
fHeader->shared_area = -1;
if (result == B_OK && fHeader->fields_size > 0) {
fFields = (field_header *)malloc(fHeader->fields_size);
if (!fFields)
return B_NO_MEMORY;
result = stream->Read(fFields, fHeader->fields_size);
result -= fHeader->fields_size;
}
}
if (fHeader->data_size > 0) {
fData = (uint8 *)malloc(fHeader->data_size);
if (!fData)
return B_NO_MEMORY;
if (result == B_OK && fHeader->data_size > 0) {
fData = (uint8 *)malloc(fHeader->data_size);
if (!fData)
return B_NO_MEMORY;
result = stream->Read(fData, fHeader->data_size);
if (result != fHeader->data_size) {
_Clear();
_InitHeader();
return (result >= 0 ? B_ERROR : result);
result = stream->Read(fData, fHeader->data_size);
result -= fHeader->data_size;
}
if (result < B_OK)
return B_BAD_VALUE;
}
if (fHeader->fields_checksum != BPrivate::CalculateChecksum((uint8 *)fFields, fHeader->fields_size)
|| fHeader->data_checksum != BPrivate::CalculateChecksum((uint8 *)fData, fHeader->data_size)) {
debug_printf("checksum mismatch\n");
debug_printf("checksum mismatch 2\n");
_Clear();
_InitHeader();
return B_BAD_VALUE;
@ -1191,162 +1348,6 @@ BMessage::PopSpecifier()
}
/* The concept of message sending by area:
The traditional way of sending a message is to send it by flattening it to
a buffer, pushing it through a port, reading it into the outputbuffer and
unflattening it from there (copying the data again). While this works ok
for small messages it does not make any sense for larger ones and may even
hit some port capacity limit.
Often in the life of a BMessage, it will be sent to someone. Almost as
often the one receiving the message will not need to change the message
in any way, but uses it "read only" to get information from it. This means
that all that copying is pretty pointless in the first place since we
could simply pass the original buffers on.
It's obviously not exactly as simple as this, since we cannot just use the
memory of one application in another - but we can share areas with
eachother.
Therefore instead of flattening into a buffer, we copy the message data
into an area, put this information into the message header and only push
this through the port. The receiving looper then builds a BMessage from
the header, that only references the data in the area (not copying it),
allowing read only access to it.
Only if write access is necessary the message will be copyed from the area
to its own buffers (like in the unflatten step before).
The double copying is reduced to a single copy in most cases and we safe
the slower route of moving the data through a port.
Additionally we save us the reference counting with the use of areas that
are reference counted internally. So we don't have to worry about leaving
an area behind or deleting one that is still in use.
*/
status_t
BMessage::_FlattenToArea(message_header **_header) const
{
DEBUG_FUNCTION_ENTER;
message_header *header = (message_header *)malloc(sizeof(message_header));
memcpy(header, fHeader, sizeof(message_header));
header->what = what;
header->fields_available = 0;
header->data_available = 0;
*_header = header;
if (header->shared_area >= B_OK)
return B_OK;
if (header->fields_size == 0 && header->data_size == 0)
return B_OK;
uint8 *address = NULL;
ssize_t size = header->fields_size + header->data_size;
size = (size + B_PAGE_SIZE) & ~(B_PAGE_SIZE - 1);
area_id area = create_area("Shared BMessage data", (void **)&address,
B_ANY_ADDRESS, size, B_NO_LOCK, B_READ_AREA | B_WRITE_AREA);
if (area < B_OK) {
free(header);
*_header = NULL;
return area;
}
if (header->fields_size > 0) {
memcpy(address, fFields, header->fields_size);
header->fields_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->fields_size);
address += header->fields_size;
}
if (header->data_size > 0) {
memcpy(address, fData, header->data_size);
header->data_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->data_size);
}
header->shared_area = area;
return B_OK;
}
status_t
BMessage::_CopyForWrite()
{
DEBUG_FUNCTION_ENTER;
if (fHeader->shared_area < B_OK)
return B_OK;
field_header *newFields = NULL;
uint8 *newData = NULL;
if (fHeader->fields_size > 0) {
newFields = (field_header *)malloc(fHeader->fields_size);
memcpy(newFields, fFields, fHeader->fields_size);
}
if (fHeader->data_size > 0) {
newData = (uint8 *)malloc(fHeader->data_size);
memcpy(newData, fData, fHeader->data_size);
}
_Dereference();
fHeader->fields_available = 0;
fHeader->data_available = 0;
fFields = newFields;
fData = newData;
return B_OK;
}
status_t
BMessage::_Reference(message_header *header)
{
DEBUG_FUNCTION_ENTER;
_Clear();
fHeader = (message_header *)malloc(sizeof(message_header));
memcpy(fHeader, header, sizeof(message_header));
fHeader->fields_available = 0;
fHeader->data_available = 0;
what = fHeader->what;
if (fHeader->shared_area < B_OK) {
/* if there is no data at all we don't need the area */
if (fHeader->fields_size == 0 && header->data_size == 0)
return B_OK;
_InitHeader();
return B_BAD_DATA;
}
uint8 *address = NULL;
area_id clone = clone_area("Cloned BMessage data", (void **)&address,
B_ANY_ADDRESS, B_READ_AREA, fHeader->shared_area);
if (clone < B_OK) {
_InitHeader();
return clone;
}
fHeader->shared_area = clone;
fFields = (field_header *)address;
address += fHeader->fields_size;
fData = address;
return B_OK;
}
status_t
BMessage::_Dereference()
{
DEBUG_FUNCTION_ENTER;
delete_area(fHeader->shared_area);
fHeader->shared_area = -1;
fFields = NULL;
fData = NULL;
return B_OK;
}
status_t
BMessage::_ResizeData(int32 offset, int32 change)
{
@ -1558,7 +1559,7 @@ BMessage::AddData(const char *name, type_code type, const void *data,
if (numBytes <= 0 || !data)
return B_BAD_VALUE;
if (fHeader->shared_area >= B_OK)
if (fClonedArea >= B_OK)
_CopyForWrite();
field_header *field = NULL;
@ -1600,6 +1601,9 @@ BMessage::RemoveData(const char *name, int32 index)
if (index < 0)
return B_BAD_VALUE;
if (fClonedArea >= B_OK)
_CopyForWrite();
field_header *field = NULL;
status_t result = _FindField(name, B_ANY_TYPE, &field);
@ -1612,9 +1616,6 @@ BMessage::RemoveData(const char *name, int32 index)
if (index >= field->count)
return B_BAD_INDEX;
if (fHeader->shared_area >= B_OK)
_CopyForWrite();
if (field->count == 1)
return _RemoveField(field);
@ -1645,6 +1646,9 @@ status_t
BMessage::RemoveName(const char *name)
{
DEBUG_FUNCTION_ENTER;
if (fClonedArea >= B_OK)
_CopyForWrite();
field_header *field = NULL;
status_t result = _FindField(name, B_ANY_TYPE, &field);
@ -1654,9 +1658,6 @@ BMessage::RemoveName(const char *name)
if (!field)
return B_ERROR;
if (fHeader->shared_area >= B_OK)
_CopyForWrite();
return _RemoveField(field);
}
@ -1717,6 +1718,9 @@ BMessage::ReplaceData(const char *name, type_code type, int32 index,
if (numBytes <= 0 || !data)
return B_BAD_VALUE;
if (fClonedArea >= B_OK)
_CopyForWrite();
field_header *field = NULL;
status_t result = _FindField(name, type, &field);
@ -1729,9 +1733,6 @@ BMessage::ReplaceData(const char *name, type_code type, int32 index,
if (index >= field->count)
return B_BAD_INDEX;
if (fHeader->shared_area >= B_OK)
_CopyForWrite();
if (field->flags & FIELD_FLAG_FIXED_SIZE) {
ssize_t size = field->data_size / field->count;
if (size != numBytes)
@ -1849,27 +1850,24 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
char *buffer = NULL;
message_header *header = NULL;
status_t result;
int32 code;
if (/*fHeader->fields_size + fHeader->data_size > B_PAGE_SIZE*/false) {
result = _FlattenToArea(&header);
buffer = (char *)header;
size = sizeof(message_header);
code = kPortMessageCodeByArea;
} else {
size = _NativeFlattenedSize();
buffer = (char *)malloc(size);
result = _NativeFlatten(buffer, size);
header = (message_header *)buffer;
code = kPortMessageCodeDirect;
}
if (result < B_OK)
return result;
if (!replyTo.IsValid()) {
BMessenger::Private(replyTo).SetTo(fHeader->reply_team,
fHeader->reply_port, fHeader->reply_target);
BMessenger::Private(replyTo).SetTo(header->reply_team,
header->reply_port, header->reply_target);
if (!replyTo.IsValid())
replyTo = be_app_messenger;
@ -1889,7 +1887,7 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout,
header->flags |= MESSAGE_FLAG_WAS_DELIVERED;
do {
result = write_port_etc(port, code, (void *)buffer, size,
result = write_port_etc(port, kPortMessageCode, (void *)buffer, size,
B_RELATIVE_TIMEOUT, timeout);
} while (result == B_INTERRUPTED);
@ -1995,7 +1993,7 @@ BMessage::_SendFlattenedMessage(void *data, int32 size, port_id port,
status_t result;
do {
result = write_port_etc(port, kPortMessageCodeDirect, data, size,
result = write_port_etc(port, kPortMessageCode, data, size,
B_RELATIVE_TIMEOUT, timeout);
} while (result == B_INTERRUPTED);
@ -2022,19 +2020,12 @@ handle_reply(port_id replyPort, int32 *pCode, bigtime_t timeout,
result = read_port(replyPort, pCode, buffer, size);
} while (result == B_INTERRUPTED);
if (result < B_OK || (*pCode != kPortMessageCodeDirect
&& *pCode != kPortMessageCodeByArea)) {
if (result < B_OK || *pCode != kPortMessageCode) {
free(buffer);
return (result < B_OK ? result : B_ERROR);
}
if (*pCode == kPortMessageCodeDirect) {
result = reply->Unflatten(buffer);
} else if (*pCode == kPortMessageCodeByArea) {
BMessage::Private messagePrivate(reply);
result = messagePrivate.Reference((BMessage::message_header *)buffer);
}
result = reply->Unflatten(buffer);
free(buffer);
return result;
}