From 0957c8215e22e1b4392b0f61a39caae09929e47e Mon Sep 17 00:00:00 2001 From: Dario Casalinuovo Date: Sat, 4 Jun 2016 15:38:10 +0200 Subject: [PATCH] rtsp_streamer run loop in a separate thread * Other general improvements. * Clarify the exit policy. * Call NotifyError in shutdownStream. --- .../plugins/rtsp_streamer/RTSPMediaIO.cpp | 92 ++++++++++++++----- .../media/plugins/rtsp_streamer/RTSPMediaIO.h | 31 +++++-- .../media/plugins/rtsp_streamer/rtsp.cpp | 3 +- 3 files changed, 93 insertions(+), 33 deletions(-) diff --git a/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.cpp b/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.cpp index dcfa0fc8a1..4b31fccd4b 100644 --- a/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.cpp +++ b/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.cpp @@ -16,31 +16,46 @@ RTSPMediaIO::RTSPMediaIO(BUrl* ourUrl) B_MEDIA_STREAMING | B_MEDIA_MUTABLE_SIZE | B_MEDIA_SEEK_BACKWARD, B_INFINITE_TIMEOUT), fUrl(ourUrl), - fInitErr(B_OK), + fClient(NULL), + fInputAdapter(NULL), fScheduler(NULL), - fEnv(NULL), - loopWatchVariable(0) + fLoopWatchVariable(0), + fLoopThread(-1), + fInitErr(B_OK) { + fInputAdapter = BuildInputAdapter(); fScheduler = BasicTaskScheduler::createNew(); fEnv = BasicUsageEnvironment::createNew(*fScheduler); - HaikuRTSPClient* client = new HaikuRTSPClient(*fEnv, fUrl->UrlString(), - 0, BuildInputAdapter()); - if (client == NULL) { + fClient = new HaikuRTSPClient(*fEnv, fUrl->UrlString(), + 0, this); + if (fClient == NULL) { fInitErr = B_ERROR; return; } - client->sendDescribeCommand(continueAfterDESCRIBE); + fClient->sendDescribeCommand(continueAfterDESCRIBE); - fEnv->taskScheduler().doEventLoop(&loopWatchVariable); + fLoopThread = spawn_thread(_LoopThread, "two minutes hate thread", + B_NORMAL_PRIORITY, this); - fInitErr = client->WaitForInit(5000000); + if (fLoopThread <= 0 || resume_thread(fLoopThread) != B_OK) { + fInitErr = B_ERROR; + return; + } + + fInitErr = fClient->WaitForInit(5000000); } RTSPMediaIO::~RTSPMediaIO() { + fClient->Close(); + + ShutdownLoop(); + + if (fLoopThread != -1) + exit_thread(fLoopThread); } @@ -51,6 +66,13 @@ RTSPMediaIO::InitCheck() const } +void +RTSPMediaIO::ShutdownLoop() +{ + fLoopWatchVariable = 1; +} + + ssize_t RTSPMediaIO::WriteAt(off_t position, const void* buffer, size_t size) { @@ -58,8 +80,24 @@ RTSPMediaIO::WriteAt(off_t position, const void* buffer, size_t size) } +int32 +RTSPMediaIO::_LoopThread(void* data) +{ + static_cast(data)->LoopThread(); + return 0; +} + + +void +RTSPMediaIO::LoopThread() +{ + fEnv->taskScheduler().doEventLoop(&fLoopWatchVariable); + fLoopThread = -1; +} + + HaikuRTSPClient::HaikuRTSPClient(UsageEnvironment& env, char const* rtspURL, - portNumBits tunnelOverHTTPPortNum, BInputAdapter* inputAdapter) + portNumBits tunnelOverHTTPPortNum, RTSPMediaIO* interface) : RTSPClient(env, rtspURL, LIVE555_VERBOSITY, "Haiku RTSP Streamer", tunnelOverHTTPPortNum, -1), @@ -68,12 +106,30 @@ HaikuRTSPClient::HaikuRTSPClient(UsageEnvironment& env, char const* rtspURL, subsession(NULL), streamTimerTask(NULL), duration(0.0f), - fInputAdapter(inputAdapter) + fInterface(interface), + fInitPort(-1) { fInitPort = create_port(1, "RTSP Client wait port"); } +HaikuRTSPClient::~HaikuRTSPClient() +{ +} + + +void +HaikuRTSPClient::Close() +{ + delete iter; + if (session != NULL) { + UsageEnvironment& env = session->envir(); + env.taskScheduler().unscheduleDelayedTask(streamTimerTask); + Medium::close(session); + } +} + + status_t HaikuRTSPClient::WaitForInit(bigtime_t timeout) { @@ -90,6 +146,7 @@ HaikuRTSPClient::WaitForInit(bigtime_t timeout) void HaikuRTSPClient::NotifyError() { + fInterface->ShutdownLoop(); status_t status = B_ERROR; write_port(fInitPort, NULL, &status, sizeof(status)); } @@ -103,19 +160,8 @@ HaikuRTSPClient::NotifySucces() } -HaikuRTSPClient::~HaikuRTSPClient() -{ - delete iter; - if (session != NULL) { - UsageEnvironment& env = session->envir(); - env.taskScheduler().unscheduleDelayedTask(streamTimerTask); - Medium::close(session); - } -} - - BInputAdapter* HaikuRTSPClient::GetInputAdapter() const { - return fInputAdapter; + return fInterface->BuildInputAdapter(); } diff --git a/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.h b/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.h index 097c06304b..f249c67add 100644 --- a/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.h +++ b/src/add-ons/media/plugins/rtsp_streamer/RTSPMediaIO.h @@ -12,6 +12,9 @@ #include "rtsp.h" +class HaikuRTSPClient; + + class RTSPMediaIO : public BAdapterIO { public: @@ -23,12 +26,22 @@ public: virtual ssize_t WriteAt(off_t position, const void* buffer, size_t size); -private: - BUrl* fUrl; - TaskScheduler* fScheduler; + void LoopThread(); + void ShutdownLoop(); +private: + static int32 _LoopThread(void* data); + + BUrl* fUrl; + BInputAdapter* fInputAdapter; + + + HaikuRTSPClient* fClient; UsageEnvironment* fEnv; - char loopWatchVariable; + TaskScheduler* fScheduler; + + char fLoopWatchVariable; + thread_id fLoopThread; status_t fInitErr; }; @@ -40,7 +53,9 @@ public: HaikuRTSPClient(UsageEnvironment& env, char const* rtspURL, portNumBits tunnelOverHTTPPortNum, - BInputAdapter* fInputAdapter); + RTSPMediaIO* fInputAdapter); + + void Close(); BInputAdapter* GetInputAdapter() const; @@ -50,10 +65,9 @@ public: void NotifySucces(); protected: + virtual ~HaikuRTSPClient(); - public: - MediaSubsessionIterator* iter; MediaSession* session; MediaSubsession* subsession; @@ -61,7 +75,8 @@ public: double duration; private: - BInputAdapter* fInputAdapter; + RTSPMediaIO* fInterface; + port_id fInitPort; }; diff --git a/src/add-ons/media/plugins/rtsp_streamer/rtsp.cpp b/src/add-ons/media/plugins/rtsp_streamer/rtsp.cpp index d9ffc6476b..a804523e9d 100644 --- a/src/add-ons/media/plugins/rtsp_streamer/rtsp.cpp +++ b/src/add-ons/media/plugins/rtsp_streamer/rtsp.cpp @@ -125,7 +125,6 @@ void continueAfterDESCRIBE(RTSPClient* rtspClient, return; } while (0); - client->NotifyError(); // An unrecoverable error occurred with this stream. shutdownStream(rtspClient); } @@ -289,7 +288,6 @@ void continueAfterPLAY(RTSPClient* rtspClient, delete[] resultString; if (!success) { - client->NotifyError(); // An unrecoverable error occurred with this stream. shutdownStream(rtspClient); } else @@ -383,6 +381,7 @@ void shutdownStream(RTSPClient* rtspClient, int exitCode) Medium::close(rtspClient); // Note that this will also cause this stream's // "StreamClientState" structure to get reclaimed. + client->NotifyError(); }