rtsp_streamer run loop in a separate thread

* Other general improvements.
* Clarify the exit policy.
* Call NotifyError in shutdownStream.
This commit is contained in:
Dario Casalinuovo 2016-06-04 15:38:10 +02:00
parent d004f81304
commit 0957c8215e
3 changed files with 93 additions and 33 deletions

View File

@ -16,31 +16,46 @@ RTSPMediaIO::RTSPMediaIO(BUrl* ourUrl)
B_MEDIA_STREAMING | B_MEDIA_MUTABLE_SIZE | B_MEDIA_SEEK_BACKWARD,
B_INFINITE_TIMEOUT),
fUrl(ourUrl),
fInitErr(B_OK),
fClient(NULL),
fInputAdapter(NULL),
fScheduler(NULL),
fEnv(NULL),
loopWatchVariable(0)
fLoopWatchVariable(0),
fLoopThread(-1),
fInitErr(B_OK)
{
fInputAdapter = BuildInputAdapter();
fScheduler = BasicTaskScheduler::createNew();
fEnv = BasicUsageEnvironment::createNew(*fScheduler);
HaikuRTSPClient* client = new HaikuRTSPClient(*fEnv, fUrl->UrlString(),
0, BuildInputAdapter());
if (client == NULL) {
fClient = new HaikuRTSPClient(*fEnv, fUrl->UrlString(),
0, this);
if (fClient == NULL) {
fInitErr = B_ERROR;
return;
}
client->sendDescribeCommand(continueAfterDESCRIBE);
fClient->sendDescribeCommand(continueAfterDESCRIBE);
fEnv->taskScheduler().doEventLoop(&loopWatchVariable);
fLoopThread = spawn_thread(_LoopThread, "two minutes hate thread",
B_NORMAL_PRIORITY, this);
fInitErr = client->WaitForInit(5000000);
if (fLoopThread <= 0 || resume_thread(fLoopThread) != B_OK) {
fInitErr = B_ERROR;
return;
}
fInitErr = fClient->WaitForInit(5000000);
}
RTSPMediaIO::~RTSPMediaIO()
{
fClient->Close();
ShutdownLoop();
if (fLoopThread != -1)
exit_thread(fLoopThread);
}
@ -51,6 +66,13 @@ RTSPMediaIO::InitCheck() const
}
void
RTSPMediaIO::ShutdownLoop()
{
fLoopWatchVariable = 1;
}
ssize_t
RTSPMediaIO::WriteAt(off_t position, const void* buffer, size_t size)
{
@ -58,8 +80,24 @@ RTSPMediaIO::WriteAt(off_t position, const void* buffer, size_t size)
}
int32
RTSPMediaIO::_LoopThread(void* data)
{
static_cast<RTSPMediaIO *>(data)->LoopThread();
return 0;
}
void
RTSPMediaIO::LoopThread()
{
fEnv->taskScheduler().doEventLoop(&fLoopWatchVariable);
fLoopThread = -1;
}
HaikuRTSPClient::HaikuRTSPClient(UsageEnvironment& env, char const* rtspURL,
portNumBits tunnelOverHTTPPortNum, BInputAdapter* inputAdapter)
portNumBits tunnelOverHTTPPortNum, RTSPMediaIO* interface)
:
RTSPClient(env, rtspURL, LIVE555_VERBOSITY, "Haiku RTSP Streamer",
tunnelOverHTTPPortNum, -1),
@ -68,12 +106,30 @@ HaikuRTSPClient::HaikuRTSPClient(UsageEnvironment& env, char const* rtspURL,
subsession(NULL),
streamTimerTask(NULL),
duration(0.0f),
fInputAdapter(inputAdapter)
fInterface(interface),
fInitPort(-1)
{
fInitPort = create_port(1, "RTSP Client wait port");
}
HaikuRTSPClient::~HaikuRTSPClient()
{
}
void
HaikuRTSPClient::Close()
{
delete iter;
if (session != NULL) {
UsageEnvironment& env = session->envir();
env.taskScheduler().unscheduleDelayedTask(streamTimerTask);
Medium::close(session);
}
}
status_t
HaikuRTSPClient::WaitForInit(bigtime_t timeout)
{
@ -90,6 +146,7 @@ HaikuRTSPClient::WaitForInit(bigtime_t timeout)
void
HaikuRTSPClient::NotifyError()
{
fInterface->ShutdownLoop();
status_t status = B_ERROR;
write_port(fInitPort, NULL, &status, sizeof(status));
}
@ -103,19 +160,8 @@ HaikuRTSPClient::NotifySucces()
}
HaikuRTSPClient::~HaikuRTSPClient()
{
delete iter;
if (session != NULL) {
UsageEnvironment& env = session->envir();
env.taskScheduler().unscheduleDelayedTask(streamTimerTask);
Medium::close(session);
}
}
BInputAdapter*
HaikuRTSPClient::GetInputAdapter() const
{
return fInputAdapter;
return fInterface->BuildInputAdapter();
}

View File

@ -12,6 +12,9 @@
#include "rtsp.h"
class HaikuRTSPClient;
class RTSPMediaIO : public BAdapterIO
{
public:
@ -23,12 +26,22 @@ public:
virtual ssize_t WriteAt(off_t position,
const void* buffer,
size_t size);
private:
BUrl* fUrl;
TaskScheduler* fScheduler;
void LoopThread();
void ShutdownLoop();
private:
static int32 _LoopThread(void* data);
BUrl* fUrl;
BInputAdapter* fInputAdapter;
HaikuRTSPClient* fClient;
UsageEnvironment* fEnv;
char loopWatchVariable;
TaskScheduler* fScheduler;
char fLoopWatchVariable;
thread_id fLoopThread;
status_t fInitErr;
};
@ -40,7 +53,9 @@ public:
HaikuRTSPClient(UsageEnvironment& env,
char const* rtspURL,
portNumBits tunnelOverHTTPPortNum,
BInputAdapter* fInputAdapter);
RTSPMediaIO* fInputAdapter);
void Close();
BInputAdapter* GetInputAdapter() const;
@ -50,10 +65,9 @@ public:
void NotifySucces();
protected:
virtual ~HaikuRTSPClient();
public:
MediaSubsessionIterator* iter;
MediaSession* session;
MediaSubsession* subsession;
@ -61,7 +75,8 @@ public:
double duration;
private:
BInputAdapter* fInputAdapter;
RTSPMediaIO* fInterface;
port_id fInitPort;
};

View File

@ -125,7 +125,6 @@ void continueAfterDESCRIBE(RTSPClient* rtspClient,
return;
} while (0);
client->NotifyError();
// An unrecoverable error occurred with this stream.
shutdownStream(rtspClient);
}
@ -289,7 +288,6 @@ void continueAfterPLAY(RTSPClient* rtspClient,
delete[] resultString;
if (!success) {
client->NotifyError();
// An unrecoverable error occurred with this stream.
shutdownStream(rtspClient);
} else
@ -383,6 +381,7 @@ void shutdownStream(RTSPClient* rtspClient, int exitCode)
Medium::close(rtspClient);
// Note that this will also cause this stream's
// "StreamClientState" structure to get reclaimed.
client->NotifyError();
}