From 251828d308081d7b93535ae8df3dae85c6a505b7 Mon Sep 17 00:00:00 2001 From: Niels Sascha Reedijk Date: Sun, 24 Jul 2022 22:07:22 +0100 Subject: [PATCH] NetServices: Rewrite the control thread loop to prepare for filtering of requests to do Change-Id: I72d5f1ddd499f8fb227af5ee78111a60eb50b1f5 --- .../network/libnetservices2/HttpSession.cpp | 124 +++++++++--------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/src/kits/network/libnetservices2/HttpSession.cpp b/src/kits/network/libnetservices2/HttpSession.cpp index 07b703e68c..f84939a578 100644 --- a/src/kits/network/libnetservices2/HttpSession.cpp +++ b/src/kits/network/libnetservices2/HttpSession.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -45,14 +46,6 @@ using namespace BPrivate::Network; static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024; -/*! - \brief Newline sequence - - As per the RFC, defined as \r\n -*/ -static constexpr std::array kNewLine = {std::byte('\r'), std::byte('\n')}; - - class BHttpSession::Request { public: Request(BHttpRequest&& request, @@ -123,10 +116,6 @@ private: // Redirection std::optional fRedirectStatus; int8 fRemainingRedirects; - - // Optional decompression - std::unique_ptr fDecompressorStorage = nullptr; - std::unique_ptr fDecompressingStream = nullptr; }; @@ -145,6 +134,8 @@ private: static status_t ControlThreadFunc(void* arg); static status_t DataThreadFunc(void* arg); + // Helper functions + std::vector GetRequestsForControlThread(); private: // constants (can be accessed unlocked) const sem_id fControlQueueSem; @@ -156,8 +147,8 @@ private: BLocker fLock; int32 fQuitting; - // queues - std::deque fControlQueue; + // queues & shared data + std::list fControlQueue; std::deque fDataQueue; std::vector fCancelList; @@ -233,17 +224,17 @@ BHttpSession::Impl::Cancel(int32 identifier) std::cout << "BHttpSession::Impl::Cancel for " << identifier << std::endl; auto lock = AutoLocker(fLock); // Check if the item is on the control queue - auto item = std::find_if(fControlQueue.begin(), fControlQueue.end(), - [&identifier](const auto& arg){ return arg.Id() == identifier; }); - if (item != fControlQueue.end()) { - try { - throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); - } catch (...) { - item->SetError(std::current_exception()); + fControlQueue.remove_if([&identifier](auto& request){ + if (request.Id() == identifier) { + try { + throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); + } catch (...) { + request.SetError(std::current_exception()); + } + return true; } - fControlQueue.erase(item); - return; - } + return false; + }); // Get it on the list for deletion in the data queue fCancelList.push_back(identifier); @@ -265,49 +256,37 @@ BHttpSession::Impl::ControlThreadFunc(void* arg) break; } - // Inner loop to process items on the queue - while (true) { - impl->fLock.Lock(); - if (impl->fControlQueue.empty() || atomic_get(&impl->fQuitting) == 1) { - impl->fLock.Unlock(); - break; - } - auto request = std::move(impl->fControlQueue.front()); - impl->fControlQueue.pop_front(); - impl->fLock.Unlock(); + // Check if we have woken up because we are quitting + if (atomic_get(&impl->fQuitting) == 1) + break; + // Get items to process (locking done by the helper) + auto requests = impl->GetRequestsForControlThread(); + if (requests.size() == 0) + continue; + + for (auto& request: requests) { std::cout << "ControlThreadFunc(): processing request " << request.Id() << std::endl; - switch (request.State()) { - case Request::InitialState: - { - bool hasError = false; - try { - request.ResolveHostName(); - request.OpenConnection(); - } catch (...) { - std::cout << "ControlThreadFunc()[" << request.Id() << "] error resolving/connecting" << std::endl; - request.SetError(std::current_exception()); - hasError = true; - } - - if (hasError) { - // Do not add the request back to the queue - break; - } - - impl->fLock.Lock(); - impl->fDataQueue.push_back(std::move(request)); - impl->fLock.Unlock(); - release_sem(impl->fDataQueueSem); - break; - } - default: - { - // not handled at this stage - break; - } + bool hasError = false; + try { + request.ResolveHostName(); + request.OpenConnection(); + } catch (...) { + std::cout << "ControlThreadFunc()[" << request.Id() << "] error resolving/connecting" << std::endl; + request.SetError(std::current_exception()); + hasError = true; } + + if (hasError) { + // Do not add the request back to the queue + continue; + } + + impl->fLock.Lock(); + impl->fDataQueue.push_back(std::move(request)); + impl->fLock.Unlock(); + release_sem(impl->fDataQueueSem); } } @@ -533,6 +512,25 @@ BHttpSession::Impl::DataThreadFunc(void* arg) } +/*! + \brief Internal helper that filters the lists of requests to guard against the concurrent + requests limit. + + This method will do the locking of the internal structure. +*/ +std::vector +BHttpSession::Impl::GetRequestsForControlThread() +{ + std::vector requests; + auto lock = AutoLocker(fLock); + fControlQueue.remove_if([this, &requests](auto& request){ + requests.emplace_back(std::move(request)); + return true; + }); + return requests; +} + + // #pragma mark -- BHttpSession (public interface)