NetServices: Rewrite the control thread loop to prepare for filtering of requests to do

Change-Id: I72d5f1ddd499f8fb227af5ee78111a60eb50b1f5
This commit is contained in:
Niels Sascha Reedijk 2022-07-24 22:07:22 +01:00
parent 60355daec9
commit 251828d308

View File

@ -8,6 +8,7 @@
#include <algorithm>
#include <deque>
#include <list>
#include <map>
#include <optional>
#include <vector>
@ -45,14 +46,6 @@ using namespace BPrivate::Network;
static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
/*!
\brief Newline sequence
As per the RFC, defined as \r\n
*/
static constexpr std::array<std::byte, 2> kNewLine = {std::byte('\r'), std::byte('\n')};
class BHttpSession::Request {
public:
Request(BHttpRequest&& request,
@ -123,10 +116,6 @@ private:
// Redirection
std::optional<BHttpStatus> fRedirectStatus;
int8 fRemainingRedirects;
// Optional decompression
std::unique_ptr<BMallocIO> fDecompressorStorage = nullptr;
std::unique_ptr<BDataIO> fDecompressingStream = nullptr;
};
@ -145,6 +134,8 @@ private:
static status_t ControlThreadFunc(void* arg);
static status_t DataThreadFunc(void* arg);
// Helper functions
std::vector<BHttpSession::Request> GetRequestsForControlThread();
private:
// constants (can be accessed unlocked)
const sem_id fControlQueueSem;
@ -156,8 +147,8 @@ private:
BLocker fLock;
int32 fQuitting;
// queues
std::deque<BHttpSession::Request> fControlQueue;
// queues & shared data
std::list<BHttpSession::Request> fControlQueue;
std::deque<BHttpSession::Request> fDataQueue;
std::vector<int32> fCancelList;
@ -233,17 +224,17 @@ BHttpSession::Impl::Cancel(int32 identifier)
std::cout << "BHttpSession::Impl::Cancel for " << identifier << std::endl;
auto lock = AutoLocker<BLocker>(fLock);
// Check if the item is on the control queue
auto item = std::find_if(fControlQueue.begin(), fControlQueue.end(),
[&identifier](const auto& arg){ return arg.Id() == identifier; });
if (item != fControlQueue.end()) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
item->SetError(std::current_exception());
fControlQueue.remove_if([&identifier](auto& request){
if (request.Id() == identifier) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
return true;
}
fControlQueue.erase(item);
return;
}
return false;
});
// Get it on the list for deletion in the data queue
fCancelList.push_back(identifier);
@ -265,49 +256,37 @@ BHttpSession::Impl::ControlThreadFunc(void* arg)
break;
}
// Inner loop to process items on the queue
while (true) {
impl->fLock.Lock();
if (impl->fControlQueue.empty() || atomic_get(&impl->fQuitting) == 1) {
impl->fLock.Unlock();
break;
}
auto request = std::move(impl->fControlQueue.front());
impl->fControlQueue.pop_front();
impl->fLock.Unlock();
// Check if we have woken up because we are quitting
if (atomic_get(&impl->fQuitting) == 1)
break;
// Get items to process (locking done by the helper)
auto requests = impl->GetRequestsForControlThread();
if (requests.size() == 0)
continue;
for (auto& request: requests) {
std::cout << "ControlThreadFunc(): processing request " << request.Id() << std::endl;
switch (request.State()) {
case Request::InitialState:
{
bool hasError = false;
try {
request.ResolveHostName();
request.OpenConnection();
} catch (...) {
std::cout << "ControlThreadFunc()[" << request.Id() << "] error resolving/connecting" << std::endl;
request.SetError(std::current_exception());
hasError = true;
}
if (hasError) {
// Do not add the request back to the queue
break;
}
impl->fLock.Lock();
impl->fDataQueue.push_back(std::move(request));
impl->fLock.Unlock();
release_sem(impl->fDataQueueSem);
break;
}
default:
{
// not handled at this stage
break;
}
bool hasError = false;
try {
request.ResolveHostName();
request.OpenConnection();
} catch (...) {
std::cout << "ControlThreadFunc()[" << request.Id() << "] error resolving/connecting" << std::endl;
request.SetError(std::current_exception());
hasError = true;
}
if (hasError) {
// Do not add the request back to the queue
continue;
}
impl->fLock.Lock();
impl->fDataQueue.push_back(std::move(request));
impl->fLock.Unlock();
release_sem(impl->fDataQueueSem);
}
}
@ -533,6 +512,25 @@ BHttpSession::Impl::DataThreadFunc(void* arg)
}
/*!
\brief Internal helper that filters the lists of requests to guard against the concurrent
requests limit.
This method will do the locking of the internal structure.
*/
std::vector<BHttpSession::Request>
BHttpSession::Impl::GetRequestsForControlThread()
{
std::vector<BHttpSession::Request> requests;
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.remove_if([this, &requests](auto& request){
requests.emplace_back(std::move(request));
return true;
});
return requests;
}
// #pragma mark -- BHttpSession (public interface)