From 69904185194cae567421c9fedc1cb041e7f0aa45 Mon Sep 17 00:00:00 2001 From: Michael Lotz Date: Thu, 23 Feb 2006 09:02:03 +0000 Subject: [PATCH] * Fixed some more bugs in the message passing by area * Reordered some functions * Area messages can now be unflattened instead of using the private _Reference and a special port code Passing by area is now mostly working but it's not yet enabled. I will have to conduct performance tests first to see if and starting at what messagesize the overhead is reasonable. git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@16500 a95241bf-73f2-0310-859d-f6bbb57e9c96 --- headers/os/app/Message4.h | 3 +- headers/private/app/MessagePrivate4.h | 12 +- src/kits/app/Looper.cpp | 13 - src/kits/app/Message4.cpp | 469 +++++++++++++------------- 4 files changed, 235 insertions(+), 262 deletions(-) diff --git a/headers/os/app/Message4.h b/headers/os/app/Message4.h index 90a59f2e88..fecf16545e 100644 --- a/headers/os/app/Message4.h +++ b/headers/os/app/Message4.h @@ -291,13 +291,14 @@ class BMessage { message_header* fHeader; field_header* fFields; uint8* fData; + area_id fClonedArea; mutable BMessage* fOriginal; BMessage* fQueueLink; // fQueueLink is used by BMessageQueue to build a linked list - uint32 fReserved[11]; + uint32 fReserved[10]; // deprecated BMessage(BMessage *message); diff --git a/headers/private/app/MessagePrivate4.h b/headers/private/app/MessagePrivate4.h index 8f07187abb..b944be1097 100644 --- a/headers/private/app/MessagePrivate4.h +++ b/headers/private/app/MessagePrivate4.h @@ -20,8 +20,7 @@ #define MAX_ITEM_PREALLOCATION B_PAGE_SIZE -static const int32 kPortMessageCodeDirect = 'pjpp'; -static const int32 kPortMessageCodeByArea = 'pjp2'; +static const int32 kPortMessageCode = 'pjpp'; enum { @@ -31,7 +30,8 @@ enum { MESSAGE_FLAG_IS_REPLY = 0x0008, MESSAGE_FLAG_WAS_DELIVERED = 0x0010, MESSAGE_FLAG_HAS_SPECIFIERS = 0x0020, - MESSAGE_FLAG_WAS_DROPPED = 0x0080 + MESSAGE_FLAG_WAS_DROPPED = 0x0080, + MESSAGE_FLAG_PASS_BY_AREA = 0x0100 }; @@ -182,12 +182,6 @@ class BMessage::Private { return fMessage->_FlattenToArea(header); } - status_t - Reference(message_header *header) - { - return fMessage->_Reference(header); - } - status_t SendMessage(port_id port, int32 token, bigtime_t timeout, bool replyRequired, BMessenger &replyTo) const diff --git a/src/kits/app/Looper.cpp b/src/kits/app/Looper.cpp index 61d0a4aa29..ea3542575d 100644 --- a/src/kits/app/Looper.cpp +++ b/src/kits/app/Looper.cpp @@ -1171,19 +1171,6 @@ BLooper::ConvertToMessage(void *buffer, int32 code) return NULL; BMessage *message = new BMessage(); -#ifdef USING_MESSAGE4 - if (code == kPortMessageCodeByArea) { - BMessage::Private messagePrivate(message); - if (messagePrivate.Reference((BMessage::message_header *)buffer) != B_OK) { - PRINT(("BLooper::ConvertToMessage(): referencing message failed\n")); - delete message; - message = NULL; - } - - return message; - } -#endif - if (message->Unflatten((const char *)buffer) != B_OK) { PRINT(("BLooper::ConvertToMessage(): unflattening message failed\n")); delete message; diff --git a/src/kits/app/Message4.cpp b/src/kits/app/Message4.cpp index 65af334870..6d193d68dd 100644 --- a/src/kits/app/Message4.cpp +++ b/src/kits/app/Message4.cpp @@ -161,6 +161,8 @@ BMessage::_InitCommon() fFields = NULL; fData = NULL; + fClonedArea = -1; + fOriginal = NULL; fQueueLink = NULL; return B_OK; @@ -196,7 +198,7 @@ status_t BMessage::_Clear() { DEBUG_FUNCTION_ENTER; - if (fHeader && fHeader->shared_area >= B_OK) + if (fClonedArea >= B_OK) _Dereference(); free(fHeader); @@ -532,7 +534,7 @@ BMessage::Rename(const char *oldEntry, const char *newEntry) if (!oldEntry || !newEntry) return B_BAD_VALUE; - if (fHeader->shared_area >= B_OK) + if (fClonedArea >= B_OK) _CopyForWrite(); uint32 hash = _HashName(oldEntry) % fHeader->hash_table_size; @@ -861,6 +863,155 @@ BMessage::_NativeFlatten(BDataIO *stream, ssize_t *size) const } +/* The concept of message sending by area: + + The traditional way of sending a message is to send it by flattening it to + a buffer, pushing it through a port, reading it into the outputbuffer and + unflattening it from there (copying the data again). While this works ok + for small messages it does not make any sense for larger ones and may even + hit some port capacity limit. + Often in the life of a BMessage, it will be sent to someone. Almost as + often the one receiving the message will not need to change the message + in any way, but uses it "read only" to get information from it. This means + that all that copying is pretty pointless in the first place since we + could simply pass the original buffers on. + It's obviously not exactly as simple as this, since we cannot just use the + memory of one application in another - but we can share areas with + eachother. + Therefore instead of flattening into a buffer, we copy the message data + into an area, put this information into the message header and only push + this through the port. The receiving looper then builds a BMessage from + the header, that only references the data in the area (not copying it), + allowing read only access to it. + Only if write access is necessary the message will be copyed from the area + to its own buffers (like in the unflatten step before). + The double copying is reduced to a single copy in most cases and we safe + the slower route of moving the data through a port. + Additionally we save us the reference counting with the use of areas that + are reference counted internally. So we don't have to worry about leaving + an area behind or deleting one that is still in use. +*/ + +status_t +BMessage::_FlattenToArea(message_header **_header) const +{ + DEBUG_FUNCTION_ENTER; + message_header *header = (message_header *)malloc(sizeof(message_header)); + memcpy(header, fHeader, sizeof(message_header)); + + header->what = what; + header->fields_available = 0; + header->data_available = 0; + header->flags |= MESSAGE_FLAG_PASS_BY_AREA; + *_header = header; + + if (header->shared_area >= B_OK) + return B_OK; + + if (header->fields_size == 0 && header->data_size == 0) + return B_OK; + + uint8 *address = NULL; + ssize_t size = header->fields_size + header->data_size; + size = (size + B_PAGE_SIZE) & ~(B_PAGE_SIZE - 1); + area_id area = create_area("Shared BMessage data", (void **)&address, + B_ANY_ADDRESS, size, B_NO_LOCK, B_READ_AREA | B_WRITE_AREA); + + if (area < B_OK) { + free(header); + *_header = NULL; + return area; + } + + if (header->fields_size > 0) { + memcpy(address, fFields, header->fields_size); + header->fields_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->fields_size); + address += header->fields_size; + } + + if (header->data_size > 0) { + memcpy(address, fData, header->data_size); + header->data_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->data_size); + } + + header->shared_area = area; + return B_OK; +} + + +status_t +BMessage::_Reference(message_header *header) +{ + DEBUG_FUNCTION_ENTER; + fHeader = header; + fHeader->flags &= ~MESSAGE_FLAG_PASS_BY_AREA; + + /* if there is no data at all we don't need the area */ + if (fHeader->fields_size == 0 && header->data_size == 0) + return B_OK; + + uint8 *address = NULL; + area_id clone = clone_area("Cloned BMessage data", (void **)&address, + B_ANY_ADDRESS, B_READ_AREA, fHeader->shared_area); + + if (clone < B_OK) { + free(fHeader); + fHeader = NULL; + _InitHeader(); + return clone; + } + + fClonedArea = clone; + fFields = (field_header *)address; + address += fHeader->fields_size; + fData = address; + return B_OK; +} + + +status_t +BMessage::_Dereference() +{ + DEBUG_FUNCTION_ENTER; + delete_area(fClonedArea); + fClonedArea = -1; + fFields = NULL; + fData = NULL; + return B_OK; +} + + +status_t +BMessage::_CopyForWrite() +{ + DEBUG_FUNCTION_ENTER; + if (fClonedArea < B_OK) + return B_OK; + + field_header *newFields = NULL; + uint8 *newData = NULL; + + if (fHeader->fields_size > 0) { + newFields = (field_header *)malloc(fHeader->fields_size); + memcpy(newFields, fFields, fHeader->fields_size); + } + + if (fHeader->data_size > 0) { + newData = (uint8 *)malloc(fHeader->data_size); + memcpy(newData, fData, fHeader->data_size); + } + + _Dereference(); + + fHeader->fields_available = 0; + fHeader->data_available = 0; + + fFields = newFields; + fData = newData; + return B_OK; +} + + status_t BMessage::Unflatten(const char *flatBuffer) { @@ -903,38 +1054,46 @@ BMessage::Unflatten(const char *flatBuffer) memcpy(fHeader, flatBuffer, sizeof(message_header)); flatBuffer += sizeof(message_header); - fHeader->shared_area = -1; - fHeader->fields_available = 0; - fHeader->data_available = 0; - what = fHeader->what; - - if (fHeader->format != kMessageMagic4 || - !(fHeader->flags & MESSAGE_FLAG_VALID)) { - _Clear(); + if (fHeader->format != kMessageMagic4 + || !(fHeader->flags & MESSAGE_FLAG_VALID)) { + free(fHeader); + fHeader = NULL; _InitHeader(); return B_BAD_VALUE; } - if (fHeader->fields_size > 0) { - fFields = (field_header *)malloc(fHeader->fields_size); - if (!fFields) - return B_NO_MEMORY; + fHeader->fields_available = 0; + fHeader->data_available = 0; + what = fHeader->what; - memcpy(fFields, flatBuffer, fHeader->fields_size); - flatBuffer += fHeader->fields_size; - } + if (fHeader->flags & MESSAGE_FLAG_PASS_BY_AREA) { + status_t result = _Reference(fHeader); + if (result < B_OK) + return result; + } else { + fHeader->shared_area = -1; - if (fHeader->data_size > 0) { - fData = (uint8 *)malloc(fHeader->data_size); - if (!fData) - return B_NO_MEMORY; + if (fHeader->fields_size > 0) { + fFields = (field_header *)malloc(fHeader->fields_size); + if (!fFields) + return B_NO_MEMORY; - memcpy(fData, flatBuffer, fHeader->data_size); + memcpy(fFields, flatBuffer, fHeader->fields_size); + flatBuffer += fHeader->fields_size; + } + + if (fHeader->data_size > 0) { + fData = (uint8 *)malloc(fHeader->data_size); + if (!fData) + return B_NO_MEMORY; + + memcpy(fData, flatBuffer, fHeader->data_size); + } } if (fHeader->fields_checksum != BPrivate::CalculateChecksum((uint8 *)fFields, fHeader->fields_size) || fHeader->data_checksum != BPrivate::CalculateChecksum((uint8 *)fData, fHeader->data_size)) { - debug_printf("checksum mismatch\n"); + debug_printf("checksum mismatch 2\n"); _Clear(); _InitHeader(); return B_BAD_VALUE; @@ -975,54 +1134,52 @@ BMessage::Unflatten(BDataIO *stream) uint8 *header = (uint8 *)fHeader; ssize_t result = stream->Read(header + sizeof(uint32), sizeof(message_header) - sizeof(uint32)); + result -= sizeof(message_header) - sizeof(uint32); - fHeader->shared_area = -1; - fHeader->fields_available = 0; - fHeader->data_available = 0; - what = fHeader->what; - - if (result != sizeof(message_header) - sizeof(uint32)) { - _Clear(); - _InitHeader(); - return (result >= 0 ? B_ERROR : result); - } - - if (fHeader->format != kMessageMagic4 || - !(fHeader->flags & MESSAGE_FLAG_VALID)) { - _Clear(); + if (result != B_OK || fHeader->format != kMessageMagic4 + || !(fHeader->flags & MESSAGE_FLAG_VALID)) { + free(fHeader); + fHeader = NULL; _InitHeader(); return B_BAD_VALUE; } - if (fHeader->fields_size > 0) { - fFields = (field_header *)malloc(fHeader->fields_size); - if (!fFields) - return B_NO_MEMORY; + fHeader->fields_available = 0; + fHeader->data_available = 0; + what = fHeader->what; - result = stream->Read(fFields, fHeader->fields_size); - if (result != fHeader->fields_size) { - _Clear(); - _InitHeader(); - return (result >= 0 ? B_ERROR : result); + if (fHeader->flags & MESSAGE_FLAG_PASS_BY_AREA) { + result = _Reference(fHeader); + if (result < B_OK) + return result; + } else { + fHeader->shared_area = -1; + + if (result == B_OK && fHeader->fields_size > 0) { + fFields = (field_header *)malloc(fHeader->fields_size); + if (!fFields) + return B_NO_MEMORY; + + result = stream->Read(fFields, fHeader->fields_size); + result -= fHeader->fields_size; } - } - if (fHeader->data_size > 0) { - fData = (uint8 *)malloc(fHeader->data_size); - if (!fData) - return B_NO_MEMORY; + if (result == B_OK && fHeader->data_size > 0) { + fData = (uint8 *)malloc(fHeader->data_size); + if (!fData) + return B_NO_MEMORY; - result = stream->Read(fData, fHeader->data_size); - if (result != fHeader->data_size) { - _Clear(); - _InitHeader(); - return (result >= 0 ? B_ERROR : result); + result = stream->Read(fData, fHeader->data_size); + result -= fHeader->data_size; } + + if (result < B_OK) + return B_BAD_VALUE; } if (fHeader->fields_checksum != BPrivate::CalculateChecksum((uint8 *)fFields, fHeader->fields_size) || fHeader->data_checksum != BPrivate::CalculateChecksum((uint8 *)fData, fHeader->data_size)) { - debug_printf("checksum mismatch\n"); + debug_printf("checksum mismatch 2\n"); _Clear(); _InitHeader(); return B_BAD_VALUE; @@ -1191,162 +1348,6 @@ BMessage::PopSpecifier() } -/* The concept of message sending by area: - - The traditional way of sending a message is to send it by flattening it to - a buffer, pushing it through a port, reading it into the outputbuffer and - unflattening it from there (copying the data again). While this works ok - for small messages it does not make any sense for larger ones and may even - hit some port capacity limit. - Often in the life of a BMessage, it will be sent to someone. Almost as - often the one receiving the message will not need to change the message - in any way, but uses it "read only" to get information from it. This means - that all that copying is pretty pointless in the first place since we - could simply pass the original buffers on. - It's obviously not exactly as simple as this, since we cannot just use the - memory of one application in another - but we can share areas with - eachother. - Therefore instead of flattening into a buffer, we copy the message data - into an area, put this information into the message header and only push - this through the port. The receiving looper then builds a BMessage from - the header, that only references the data in the area (not copying it), - allowing read only access to it. - Only if write access is necessary the message will be copyed from the area - to its own buffers (like in the unflatten step before). - The double copying is reduced to a single copy in most cases and we safe - the slower route of moving the data through a port. - Additionally we save us the reference counting with the use of areas that - are reference counted internally. So we don't have to worry about leaving - an area behind or deleting one that is still in use. -*/ - -status_t -BMessage::_FlattenToArea(message_header **_header) const -{ - DEBUG_FUNCTION_ENTER; - message_header *header = (message_header *)malloc(sizeof(message_header)); - memcpy(header, fHeader, sizeof(message_header)); - - header->what = what; - header->fields_available = 0; - header->data_available = 0; - *_header = header; - - if (header->shared_area >= B_OK) - return B_OK; - - if (header->fields_size == 0 && header->data_size == 0) - return B_OK; - - uint8 *address = NULL; - ssize_t size = header->fields_size + header->data_size; - size = (size + B_PAGE_SIZE) & ~(B_PAGE_SIZE - 1); - area_id area = create_area("Shared BMessage data", (void **)&address, - B_ANY_ADDRESS, size, B_NO_LOCK, B_READ_AREA | B_WRITE_AREA); - - if (area < B_OK) { - free(header); - *_header = NULL; - return area; - } - - if (header->fields_size > 0) { - memcpy(address, fFields, header->fields_size); - header->fields_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->fields_size); - address += header->fields_size; - } - - if (header->data_size > 0) { - memcpy(address, fData, header->data_size); - header->data_checksum = BPrivate::CalculateChecksum((uint8 *)address, header->data_size); - } - - header->shared_area = area; - return B_OK; -} - - -status_t -BMessage::_CopyForWrite() -{ - DEBUG_FUNCTION_ENTER; - if (fHeader->shared_area < B_OK) - return B_OK; - - field_header *newFields = NULL; - uint8 *newData = NULL; - - if (fHeader->fields_size > 0) { - newFields = (field_header *)malloc(fHeader->fields_size); - memcpy(newFields, fFields, fHeader->fields_size); - } - - if (fHeader->data_size > 0) { - newData = (uint8 *)malloc(fHeader->data_size); - memcpy(newData, fData, fHeader->data_size); - } - - _Dereference(); - - fHeader->fields_available = 0; - fHeader->data_available = 0; - - fFields = newFields; - fData = newData; - return B_OK; -} - - -status_t -BMessage::_Reference(message_header *header) -{ - DEBUG_FUNCTION_ENTER; - _Clear(); - - fHeader = (message_header *)malloc(sizeof(message_header)); - memcpy(fHeader, header, sizeof(message_header)); - fHeader->fields_available = 0; - fHeader->data_available = 0; - what = fHeader->what; - - if (fHeader->shared_area < B_OK) { - /* if there is no data at all we don't need the area */ - if (fHeader->fields_size == 0 && header->data_size == 0) - return B_OK; - - _InitHeader(); - return B_BAD_DATA; - } - - uint8 *address = NULL; - area_id clone = clone_area("Cloned BMessage data", (void **)&address, - B_ANY_ADDRESS, B_READ_AREA, fHeader->shared_area); - - if (clone < B_OK) { - _InitHeader(); - return clone; - } - - fHeader->shared_area = clone; - fFields = (field_header *)address; - address += fHeader->fields_size; - fData = address; - return B_OK; -} - - -status_t -BMessage::_Dereference() -{ - DEBUG_FUNCTION_ENTER; - delete_area(fHeader->shared_area); - fHeader->shared_area = -1; - fFields = NULL; - fData = NULL; - return B_OK; -} - - status_t BMessage::_ResizeData(int32 offset, int32 change) { @@ -1558,7 +1559,7 @@ BMessage::AddData(const char *name, type_code type, const void *data, if (numBytes <= 0 || !data) return B_BAD_VALUE; - if (fHeader->shared_area >= B_OK) + if (fClonedArea >= B_OK) _CopyForWrite(); field_header *field = NULL; @@ -1600,6 +1601,9 @@ BMessage::RemoveData(const char *name, int32 index) if (index < 0) return B_BAD_VALUE; + if (fClonedArea >= B_OK) + _CopyForWrite(); + field_header *field = NULL; status_t result = _FindField(name, B_ANY_TYPE, &field); @@ -1612,9 +1616,6 @@ BMessage::RemoveData(const char *name, int32 index) if (index >= field->count) return B_BAD_INDEX; - if (fHeader->shared_area >= B_OK) - _CopyForWrite(); - if (field->count == 1) return _RemoveField(field); @@ -1645,6 +1646,9 @@ status_t BMessage::RemoveName(const char *name) { DEBUG_FUNCTION_ENTER; + if (fClonedArea >= B_OK) + _CopyForWrite(); + field_header *field = NULL; status_t result = _FindField(name, B_ANY_TYPE, &field); @@ -1654,9 +1658,6 @@ BMessage::RemoveName(const char *name) if (!field) return B_ERROR; - if (fHeader->shared_area >= B_OK) - _CopyForWrite(); - return _RemoveField(field); } @@ -1717,6 +1718,9 @@ BMessage::ReplaceData(const char *name, type_code type, int32 index, if (numBytes <= 0 || !data) return B_BAD_VALUE; + if (fClonedArea >= B_OK) + _CopyForWrite(); + field_header *field = NULL; status_t result = _FindField(name, type, &field); @@ -1729,9 +1733,6 @@ BMessage::ReplaceData(const char *name, type_code type, int32 index, if (index >= field->count) return B_BAD_INDEX; - if (fHeader->shared_area >= B_OK) - _CopyForWrite(); - if (field->flags & FIELD_FLAG_FIXED_SIZE) { ssize_t size = field->data_size / field->count; if (size != numBytes) @@ -1849,27 +1850,24 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout, char *buffer = NULL; message_header *header = NULL; status_t result; - int32 code; if (/*fHeader->fields_size + fHeader->data_size > B_PAGE_SIZE*/false) { result = _FlattenToArea(&header); buffer = (char *)header; size = sizeof(message_header); - code = kPortMessageCodeByArea; } else { size = _NativeFlattenedSize(); buffer = (char *)malloc(size); result = _NativeFlatten(buffer, size); header = (message_header *)buffer; - code = kPortMessageCodeDirect; } if (result < B_OK) return result; if (!replyTo.IsValid()) { - BMessenger::Private(replyTo).SetTo(fHeader->reply_team, - fHeader->reply_port, fHeader->reply_target); + BMessenger::Private(replyTo).SetTo(header->reply_team, + header->reply_port, header->reply_target); if (!replyTo.IsValid()) replyTo = be_app_messenger; @@ -1889,7 +1887,7 @@ BMessage::_SendMessage(port_id port, int32 token, bigtime_t timeout, header->flags |= MESSAGE_FLAG_WAS_DELIVERED; do { - result = write_port_etc(port, code, (void *)buffer, size, + result = write_port_etc(port, kPortMessageCode, (void *)buffer, size, B_RELATIVE_TIMEOUT, timeout); } while (result == B_INTERRUPTED); @@ -1995,7 +1993,7 @@ BMessage::_SendFlattenedMessage(void *data, int32 size, port_id port, status_t result; do { - result = write_port_etc(port, kPortMessageCodeDirect, data, size, + result = write_port_etc(port, kPortMessageCode, data, size, B_RELATIVE_TIMEOUT, timeout); } while (result == B_INTERRUPTED); @@ -2022,19 +2020,12 @@ handle_reply(port_id replyPort, int32 *pCode, bigtime_t timeout, result = read_port(replyPort, pCode, buffer, size); } while (result == B_INTERRUPTED); - if (result < B_OK || (*pCode != kPortMessageCodeDirect - && *pCode != kPortMessageCodeByArea)) { + if (result < B_OK || *pCode != kPortMessageCode) { free(buffer); return (result < B_OK ? result : B_ERROR); } - if (*pCode == kPortMessageCodeDirect) { - result = reply->Unflatten(buffer); - } else if (*pCode == kPortMessageCodeByArea) { - BMessage::Private messagePrivate(reply); - result = messagePrivate.Reference((BMessage::message_header *)buffer); - } - + result = reply->Unflatten(buffer); free(buffer); return result; }