finished notification support

git-svn-id: file:///srv/svn/repos/haiku/trunk/current@1345 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
beveloper 2002-10-02 16:54:53 +00:00
parent 1b7f115063
commit e66fda405a
6 changed files with 243 additions and 68 deletions

View File

@ -22,20 +22,17 @@ struct request_data;
status_t SendToServer(BMessage *msg);
status_t QueryServer(BMessage *request, BMessage *reply);
// BMessage based data exchange with the media_addon_server
status_t SendToAddonServer(BMessage *msg);
// Raw data based data exchange with the media_server
status_t SendToServer(int32 msgcode, void *msg, int size);
status_t QueryServer(int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize);
status_t QueryServer(int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize);
// Raw data based data exchange with the media_addon_server
status_t SendToAddonServer(int32 msgcode, void *msg, int size);
status_t QueryAddonServer(int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize);
status_t QueryAddonServer(int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize);
// Raw data based data exchange with the media_server
status_t SendToPort(port_id sendport, int32 msgcode, void *msg, int size);
status_t QueryPort(port_id requestport, int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize);
status_t QueryPort(port_id requestport, int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize);
// The base struct used for all raw requests
struct request_data

View File

@ -4,7 +4,13 @@
*/
#include <OS.h>
#include <Messenger.h>
#include "debug.h"
#include "PortPool.h"
#include "DataExchange.h"
#include "ServerInterface.h" // NEW_MEDIA_SERVER_SIGNATURE
#define TIMEOUT 100000
namespace BPrivate {
namespace media {
@ -13,67 +19,112 @@ team_id team;
namespace dataexchange {
BMessenger *MediaServerMessenger;
static port_id MediaServerPort;
static port_id MediaAddonServerPort;
class initit
{
public:
initit()
{
MediaServerMessenger = new BMessenger(NEW_MEDIA_SERVER_SIGNATURE);
MediaServerPort = find_port("media_server port");
MediaAddonServerPort = find_port("media_addon_server port");
}
~initit()
{
delete MediaServerMessenger;
}
};
initit _initit;
void
request_data::SendReply(reply_data *reply, int replysize) const
{
SendToPort(reply_port, 0, reply, replysize);
}
// BMessage based data exchange with the media_server
status_t SendToServer(BMessage *msg)
{
return B_OK;
status_t rv;
rv = MediaServerMessenger->SendMessage(msg, static_cast<BHandler *>(NULL), TIMEOUT);
if (rv != B_OK)
TRACE("SendToServer: SendMessage failed\n");
return rv;
}
status_t QueryServer(BMessage *request, BMessage *reply)
{
return B_OK;
}
// BMessage based data exchange with the media_addon_server
status_t SendToAddonServer(BMessage *msg)
{
return B_OK;
status_t rv;
rv = MediaServerMessenger->SendMessage(request, reply, TIMEOUT, TIMEOUT);
if (rv != B_OK)
TRACE("QueryServer: SendMessage failed\n");
return rv;
}
// Raw data based data exchange with the media_server
status_t SendToServer(int32 msgcode, void *msg, int size)
{
return B_OK;
return SendToPort(MediaServerPort, msgcode, msg, size);
}
status_t QueryServer(int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize)
status_t QueryServer(int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize)
{
return B_OK;
return QueryPort(MediaServerPort, msgcode, request, requestsize, reply, replysize);
}
// Raw data based data exchange with the media_addon_server
status_t SendToAddonServer(int32 msgcode, void *msg, int size)
{
return B_OK;
return SendToPort(MediaAddonServerPort, msgcode, msg, size);
}
status_t QueryAddonServer(int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize)
status_t QueryAddonServer(int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize)
{
return B_OK;
return QueryPort(MediaAddonServerPort, msgcode, request, requestsize, reply, replysize);
}
// Raw data based data exchange with the media_server
status_t SendToPort(port_id sendport, int32 msgcode, void *msg, int size)
{
status_t rv;
rv = write_port(sendport, msgcode, msg, size);
if (rv != B_OK)
TRACE("SendToPort: write_port failed\n");
return B_OK;
}
status_t QueryPort(port_id requestport, int32 msgcode, const request_data *request, int requestsize, reply_data *reply, int replysize)
status_t QueryPort(port_id requestport, int32 msgcode, request_data *request, int requestsize, reply_data *reply, int replysize)
{
return B_OK;
status_t rv;
int32 code;
request->reply_port = _PortPool->GetPort();
rv = write_port(requestport, msgcode, request, requestsize);
if (rv != B_OK) {
TRACE("QueryPort: write_port failed\n");
_PortPool->PutPort(request->reply_port);
return rv;
}
rv = read_port(request->reply_port, &code, reply, replysize);
_PortPool->PutPort(request->reply_port);
if (rv < B_OK)
TRACE("QueryPort: read_port failed\n");
return (rv < B_OK) ? rv : reply->result;
}
}; // dataexchange

View File

@ -32,7 +32,7 @@ namespace BPrivate { namespace media {
class _DefaultDeleter
{
public:
void Delete() { delete BMediaRoster::_sDefault; }
~_DefaultDeleter() { delete BMediaRoster::_sDefault; }
};
_DefaultDeleter _deleter;

View File

@ -4,25 +4,23 @@
#include <Messenger.h>
#include <MediaNode.h>
#include <Debug.h>
#include "NodeManager.h"
#include "DataExchange.h"
#include "Notifications.h"
#include "NotificationManager.h"
#include "Queue.h"
#define NOTIFICATION_THREAD_PRIORITY 19
extern NodeManager *gNodeManager;
struct RegisteredHandler
{
BMessenger messenger;
media_node_id nodeid;
int32 mask;
team_id team;
};
#define NOTIFICATION_THREAD_PRIORITY 19
#define TIMEOUT 100000
NotificationManager::NotificationManager()
: fNotificationQueue(new Queue),
fNotificationThreadId(-1),
fLocker(new BLocker)
fLocker(new BLocker),
fNotificationList(new List<Notification>)
{
fNotificationThreadId = spawn_thread(NotificationManager::worker_thread, "notification broadcast", NOTIFICATION_THREAD_PRIORITY, this);
resume_thread(fNotificationThreadId);
@ -36,6 +34,7 @@ NotificationManager::~NotificationManager()
wait_for_thread(fNotificationThreadId, &dummy);
delete fNotificationQueue;
delete fLocker;
delete fNotificationList;
}
void
@ -58,8 +57,23 @@ NotificationManager::RequestNotifications(BMessage *msg)
msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team);
msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what);
msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &nodesize);
ASSERT(nodesize == sizeof(node));
ASSERT(nodesize == sizeof(media_node));
Notification n;
n.messenger = messenger;
n.node = *node;
n.what = what;
n.team = team;
fLocker->Lock();
fNotificationList->Insert(n);
fLocker->Unlock();
// send the initial B_MEDIA_NODE_CREATED containing all existing live nodes
BMessage initmsg(B_MEDIA_NODE_CREATED);
if (B_OK == gNodeManager->GetLiveNodes(&initmsg)) {
messenger.SendMessage(&initmsg, static_cast<BHandler *>(NULL), TIMEOUT);
}
}
void
@ -75,23 +89,122 @@ NotificationManager::CancelNotifications(BMessage *msg)
msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team);
msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what);
msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &nodesize);
ASSERT(nodesize == sizeof(node));
ASSERT(nodesize == sizeof(media_node));
/* if what == B_MEDIA_WILDCARD && node == media_node::null
* => delete all notifications for the matching team & messenger
* else if what != B_MEDIA_WILDCARD && node == media_node::null
* => delete all notifications for the matching what & team & messenger
* else if what == B_MEDIA_WILDCARD && node != media_node::null
* => delete all notifications for the matching team & messenger & node
* else if what != B_MEDIA_WILDCARD && node != media_node::null
* => delete all notifications for the matching what & team & messenger & node
*/
fLocker->Lock();
Notification n;
for (int32 index = 0; fNotificationList->GetAt(index, &n); index++) {
bool remove;
if (what == B_MEDIA_WILDCARD && *node == media_node::null && team == n.team && messenger == n.messenger)
remove = true;
else if (what != B_MEDIA_WILDCARD && *node == media_node::null && what == n.what && team == n.team && messenger == n.messenger)
remove = true;
else if (what == B_MEDIA_WILDCARD && *node != media_node::null && team == n.team && messenger == n.messenger && n.node == *node)
remove = true;
else if (what != B_MEDIA_WILDCARD && *node != media_node::null && what == n.what && team == n.team && messenger == n.messenger && n.node == *node)
remove = true;
else
remove = false;
if (remove) {
if (fNotificationList->Remove(index)) {
index--;
} else {
ASSERT(false);
}
}
}
fLocker->Unlock();
}
void
NotificationManager::SendNotifications(BMessage *msg)
{
const media_source *source;
const media_destination *destination;
const media_node *node;
ssize_t size;
int32 what;
msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what);
msg->RemoveName(NOTIFICATION_PARAM_WHAT);
msg->what = what;
fLocker->Lock();
Notification n;
for (int32 index = 0; fNotificationList->GetAt(index, &n); index++) {
if (n.what != B_MEDIA_WILDCARD && n.what != what)
continue;
switch (what) {
case B_MEDIA_NODE_CREATED:
case B_MEDIA_NODE_DELETED:
case B_MEDIA_CONNECTION_MADE:
case B_MEDIA_CONNECTION_BROKEN:
case B_MEDIA_BUFFER_CREATED:
case B_MEDIA_BUFFER_DELETED:
case B_MEDIA_TRANSPORT_STATE:
case B_MEDIA_DEFAULT_CHANGED:
case B_MEDIA_FLAVORS_CHANGED:
if (n.node != media_node::null)
continue;
break;
case B_MEDIA_NEW_PARAMETER_VALUE:
case B_MEDIA_PARAMETER_CHANGED:
case B_MEDIA_NODE_STOPPED:
case B_MEDIA_WEB_CHANGED:
msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &size);
ASSERT(size == sizeof(media_node));
if (n.node != *node)
continue;
break;
case B_MEDIA_FORMAT_CHANGED:
msg->FindData("source", B_RAW_TYPE, reinterpret_cast<const void **>(&source), &size);
ASSERT(size == sizeof(media_source));
msg->FindData("destination", B_RAW_TYPE, reinterpret_cast<const void **>(&destination), &size);
ASSERT(size == sizeof(media_destination));
if (n.node.port != source->port && n.node.port != destination->port)
continue;
break;
}
n.messenger.SendMessage(msg, static_cast<BHandler *>(NULL), TIMEOUT);
}
fLocker->Unlock();
}
void
NotificationManager::CleanupTeam(team_id team)
{
}
fLocker->Lock();
void
NotificationManager::BroadcastMessages(BMessage *msg)
{
Notification n;
for (int32 index = 0; fNotificationList->GetAt(index, &n); index++) {
if (n.team == team) {
if (fNotificationList->Remove(index)) {
index--;
} else {
ASSERT(false);
}
}
}
fLocker->Unlock();
}
void

View File

@ -1,6 +1,17 @@
#include <MediaNode.h>
#include "TList.h"
class Queue;
struct Notification
{
BMessenger messenger;
media_node node;
int32 what;
team_id team;
};
class NotificationManager
{
public:
@ -16,7 +27,6 @@ private:
void CancelNotifications(BMessage *msg);
void SendNotifications(BMessage *msg);
void BroadcastMessages(BMessage *msg);
void WorkerThread();
static int32 worker_thread(void *arg);
@ -24,4 +34,5 @@ private:
Queue * fNotificationQueue;
thread_id fNotificationThreadId;
BLocker * fLocker;
List<Notification> *fNotificationList;
};

View File

@ -31,6 +31,11 @@
*
*/
NotificationManager *gNotificationManager;
BufferManager *gBufferManager;
AppManager *gAppManager;
NodeManager *gNodeManager;
#define REPLY_TIMEOUT ((bigtime_t)500000)
@ -107,10 +112,6 @@ private:
port_id control_port;
thread_id control_thread;
NotificationManager *fNotificationManager;
BufferManager *fBufferManager;
AppManager *fAppManager;
NodeManager *fNodeManager;
BLocker *fLocker;
float fVolumeLeft;
@ -122,10 +123,6 @@ private:
ServerApp::ServerApp()
: BApplication(NEW_MEDIA_SERVER_SIGNATURE),
fNotificationManager(new NotificationManager),
fBufferManager(new BufferManager),
fAppManager(new AppManager),
fNodeManager(new NodeManager),
fLocker(new BLocker("server locker")),
fVolumeLeft(0.0),
fVolumeRight(0.0)
@ -133,6 +130,12 @@ ServerApp::ServerApp()
//load volume settings from config file
//mVolumeLeft = ???;
//mVolumeRight = ???;
gNotificationManager = new NotificationManager;
gBufferManager = new BufferManager;
gAppManager = new AppManager;
gNodeManager = new NodeManager;
control_port = create_port(64,"media_server port");
control_thread = spawn_thread(controlthread,"media_server control",12,this);
resume_thread(control_thread);
@ -140,10 +143,10 @@ ServerApp::ServerApp()
ServerApp::~ServerApp()
{
delete fNotificationManager;
delete fBufferManager;
delete fAppManager;
delete fNodeManager;
delete gNotificationManager;
delete gBufferManager;
delete gAppManager;
delete gNodeManager;
delete fLocker;
delete_port(control_port);
status_t err;
@ -160,7 +163,7 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
xfer_server_get_mediaaddon_ref *msg = (xfer_server_get_mediaaddon_ref *)data;
xfer_server_get_mediaaddon_ref_reply reply;
entry_ref tempref;
reply.result = fNodeManager->GetAddonRef(&tempref, msg->addonid);
reply.result = gNodeManager->GetAddonRef(&tempref, msg->addonid);
reply.ref = tempref;
write_port(msg->reply_port, 0, &reply, sizeof(reply));
break;
@ -188,7 +191,7 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
{
xfer_server_register_mediaaddon *msg = (xfer_server_register_mediaaddon *)data;
xfer_server_register_mediaaddon_reply reply;
fNodeManager->RegisterAddon(msg->ref, &reply.addonid);
gNodeManager->RegisterAddon(msg->ref, &reply.addonid);
write_port(msg->reply_port, 0, &reply, sizeof(reply));
break;
}
@ -196,7 +199,7 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
case SERVER_UNREGISTER_MEDIAADDON:
{
xfer_server_unregister_mediaaddon *msg = (xfer_server_unregister_mediaaddon *)data;
fNodeManager->UnregisterAddon(msg->addonid);
gNodeManager->UnregisterAddon(msg->addonid);
break;
}
@ -205,10 +208,10 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
xfer_server_register_dormant_node *msg = (xfer_server_register_dormant_node *)data;
dormant_flavor_info dfi;
if (msg->purge_id > 0)
fNodeManager->RemoveDormantFlavorInfo(msg->purge_id);
gNodeManager->RemoveDormantFlavorInfo(msg->purge_id);
rv = dfi.Unflatten(msg->dfi_type, &(msg->dfi), msg->dfi_size);
ASSERT(rv == B_OK);
fNodeManager->AddDormantFlavorInfo(dfi);
gNodeManager->AddDormantFlavorInfo(dfi);
break;
}
@ -218,7 +221,7 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
xfer_server_get_dormant_nodes_reply reply;
dormant_node_info * infos = new dormant_node_info[msg->maxcount];
reply.count = msg->maxcount;
reply.result = fNodeManager->GetDormantNodes(
reply.result = gNodeManager->GetDormantNodes(
infos,
&reply.count,
msg->has_input ? &msg->inputformat : NULL,
@ -241,7 +244,7 @@ ServerApp::HandleMessage(int32 code, void *data, size_t size)
dormant_flavor_info dfi;
status_t rv;
rv = fNodeManager->GetDormantFlavorInfoFor(msg->addon, msg->flavor_id, &dfi);
rv = gNodeManager->GetDormantFlavorInfoFor(msg->addon, msg->flavor_id, &dfi);
if (rv != B_OK) {
xfer_server_get_dormant_flavor_info_reply reply;
reply.result = rv;
@ -285,7 +288,7 @@ void
ServerApp::GetSharedBufferArea(BMessage *msg)
{
BMessage reply(B_OK);
reply.AddInt32("area",fBufferManager->SharedBufferListID());
reply.AddInt32("area",gBufferManager->SharedBufferListID());
msg->SendReply(&reply,(BHandler*)NULL,REPLY_TIMEOUT);
}
@ -312,9 +315,9 @@ ServerApp::RegisterBuffer(BMessage *msg)
//TRACE("ServerApp::RegisterBuffer team = 0x%08x, areaid = 0x%08x, offset = 0x%08x, size = 0x%08x, flags = 0x%08x, buffer = 0x%08x\n",(int)teamid,(int)area,(int)offset,(int)size,(int)flags,(int)bufferid);
if (bufferid == 0)
status = fBufferManager->RegisterBuffer(teamid, size, flags, offset, area, &bufferid);
status = gBufferManager->RegisterBuffer(teamid, size, flags, offset, area, &bufferid);
else
status = fBufferManager->RegisterBuffer(teamid, bufferid, &size, &flags, &offset, &area);
status = gBufferManager->RegisterBuffer(teamid, bufferid, &size, &flags, &offset, &area);
BMessage reply(status);
reply.AddInt32("buffer",bufferid);
@ -336,7 +339,7 @@ ServerApp::UnregisterBuffer(BMessage *msg)
teamid = msg->FindInt32("team");
bufferid = msg->FindInt32("buffer");
status = fBufferManager->UnregisterBuffer(teamid, bufferid);
status = gBufferManager->UnregisterBuffer(teamid, bufferid);
BMessage reply(status);
msg->SendReply(&reply,(BHandler*)NULL,REPLY_TIMEOUT);
@ -412,7 +415,7 @@ void ServerApp::RegisterApp(BMessage *msg)
{
team_id team;
msg->FindInt32("team", &team);
fAppManager->RegisterTeam(team, msg->ReturnAddress());
gAppManager->RegisterTeam(team, msg->ReturnAddress());
BMessage reply(B_OK);
msg->SendReply(&reply,(BHandler*)NULL,REPLY_TIMEOUT);
@ -423,7 +426,7 @@ void ServerApp::UnregisterApp(BMessage *msg)
{
team_id team;
msg->FindInt32("team", &team);
fAppManager->UnregisterTeam(team);
gAppManager->UnregisterTeam(team);
BMessage reply(B_OK);
msg->SendReply(&reply,(BHandler*)NULL,REPLY_TIMEOUT);
@ -548,9 +551,9 @@ void ServerApp::MessageReceived(BMessage *msg)
case MEDIA_SERVER_GET_SHARED_BUFFER_AREA: GetSharedBufferArea(msg); break;
case MEDIA_SERVER_REGISTER_BUFFER: RegisterBuffer(msg); break;
case MEDIA_SERVER_UNREGISTER_BUFFER: UnregisterBuffer(msg); break;
case MEDIA_SERVER_REQUEST_NOTIFICATIONS: fNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_CANCEL_NOTIFICATIONS: fNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_SEND_NOTIFICATIONS: fNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_REQUEST_NOTIFICATIONS: gNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_CANCEL_NOTIFICATIONS: gNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_SEND_NOTIFICATIONS: gNotificationManager->EnqueueMessage(msg); break;
case MEDIA_SERVER_GET_NODE_ID: GetNodeID(msg); break;