NetServices: add HttpBuffer to encapsulate buffered reading

Change-Id: I7ec3189c3cf459fccae4ffac44702fd6b8c12ac9
This commit is contained in:
Niels Sascha Reedijk 2022-07-03 10:13:05 +01:00
parent 6cf088266f
commit a1e7583626
4 changed files with 226 additions and 77 deletions

View File

@ -6,7 +6,9 @@
#ifndef _HTTP_STREAM_H_
#define _HTTP_STREAM_H_
#include <functional>
#include <memory>
#include <optional>
#include <vector>
class BDataIO;
@ -47,11 +49,34 @@ public:
virtual TransferInfo Transfer(BDataIO* target) override;
private:
off_t fRemainingHeaderSize = 0;
BDataIO* fBody = nullptr;
off_t fTotalBodySize = 0;
off_t fBufferedBodySize = 0;
off_t fTransferredBodySize = 0;
off_t fRemainingHeaderSize = 0;
BDataIO* fBody = nullptr;
off_t fTotalBodySize = 0;
off_t fBufferedBodySize = 0;
off_t fTransferredBodySize = 0;
};
class HttpBuffer {
public:
using WriteFunction = std::function<size_t (const std::byte*, size_t)>;
public:
HttpBuffer(size_t capacity = 8*1024);
ssize_t ReadFrom(BDataIO* source);
void WriteExactlyTo(BDataIO* target);
void WriteTo(WriteFunction func);
std::optional<BString> GetNextLine();
size_t RemainingBytes() noexcept;
void Flush() noexcept;
void Clear() noexcept;
private:
std::vector<std::byte> fBuffer;
size_t fCurrentOffset = 0;
};

View File

@ -59,7 +59,7 @@ struct HttpResultPrivate {
void SetStatus(BHttpStatus&& s);
void SetFields(BHttpFields&& f);
void SetBody();
ssize_t WriteToBody(const void* buffer, ssize_t size);
size_t WriteToBody(const void* buffer, size_t size);
};
@ -131,8 +131,8 @@ HttpResultPrivate::SetBody()
}
inline ssize_t
HttpResultPrivate::WriteToBody(const void* buffer, ssize_t size)
inline size_t
HttpResultPrivate::WriteToBody(const void* buffer, size_t size)
{
// TODO: when the support for a shared BMemoryRingIO is here, choose
// between one or the other depending on which one is available.
@ -140,7 +140,10 @@ HttpResultPrivate::WriteToBody(const void* buffer, ssize_t size)
bodyText.Append(static_cast<const char*>(buffer), size);
return size;
}
return ownedBody->Write(buffer, size);
auto result = ownedBody->Write(buffer, size);
if (result < 0)
throw BSystemError("BDataIO::Write()", result);
return result;
}

View File

@ -103,7 +103,6 @@ public:
bool CanCancel() const noexcept { return fResult->CanCancel(); }
private:
std::optional<BString> _GetLine(std::vector<std::byte>::const_iterator& offset);
BHttpStatus _ParseStatus(BString&& statusLine);
private:
@ -125,7 +124,7 @@ private:
fDataStream;
// Receive buffers
std::vector<std::byte> fInputBuffer;
HttpBuffer fBuffer;
// Receive state
off_t fBodyBytesTotal = 0;
@ -761,36 +760,22 @@ BHttpSession::Request::TransferRequest()
bool
BHttpSession::Request::ReceiveResult()
{
ssize_t bytesRead = 0;
bool receiveEnd = false;
// First: stream data from the socket
auto newDataOffset = fInputBuffer.size();
std::cout << "ReceiveResult() [" << Id() << "] before receive fInputBuffer.size() " << fInputBuffer.size() << std::endl;
fInputBuffer.resize(newDataOffset + kMaxReadSize);
bytesRead = fSocket->Read(fInputBuffer.data() + newDataOffset, kMaxReadSize);
auto bytesRead = fBuffer.ReadFrom(fSocket.get());
if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED) {
fInputBuffer.resize(newDataOffset); // revert to previous size of the data in the buffer
return false;
}
if (bytesRead < 0) {
std::cout << "ReceiveResult() [" << Id() << "] BSocket::Read error " << bytesRead << std::endl;
throw BNetworkRequestError("BSocket::Read()", BNetworkRequestError::NetworkError,
bytesRead);
} else if (bytesRead == 0) {
// This may occur when the connection is closed (and the transfer is finished).
// Later on, there is a check to determine whether the request is finished as expected.
receiveEnd = true;
fInputBuffer.resize(newDataOffset); // revert to previous size of the data in buffer
}
fInputBuffer.resize(newDataOffset + bytesRead);
std::cout << "ReceiveResult() [" << Id() << "] after fInputBuffer.size() " << fInputBuffer.size() << std::endl;
std::cout << "ReceiveResult() [" << Id() << "] read " << bytesRead << " from socket" << std::endl;
// Parse the content in the buffer
auto bufferStart = fInputBuffer.cbegin();
switch (fRequestStatus) {
case InitialState:
[[fallthrough]];
@ -799,7 +784,7 @@ BHttpSession::Request::ReceiveResult()
"Read function called for object that is not yet connected or sent");
case RequestSent:
{
auto statusLine = _GetLine(bufferStart);
auto statusLine = fBuffer.GetNextLine();
BHttpStatus status;
if (statusLine) {
@ -862,12 +847,12 @@ BHttpSession::Request::ReceiveResult()
}
case StatusReceived:
{
auto fieldLine = _GetLine(bufferStart);
auto fieldLine = fBuffer.GetNextLine();
while (fieldLine && !fieldLine.value().IsEmpty()){
std::cout << "ReceiveResult() [" << Id() << "] StatusReceived; adding header " << fieldLine.value() << std::endl;
// Parse next header line
fFields.AddField(fieldLine.value());
fieldLine = _GetLine(bufferStart);
fieldLine = fBuffer.GetNextLine();
}
if (fieldLine && fieldLine.value().IsEmpty()){
@ -974,7 +959,7 @@ BHttpSession::Request::ReceiveResult()
{
// TODO: handle chunked transfer
bytesRead = std::distance(bufferStart, fInputBuffer.cend());
bytesRead = fBuffer.RemainingBytes();
fBodyBytesReceived += bytesRead;
std::cout << "ReceiveResult() [" << Id() << "] body bytes current read/total received/total expected: " <<
bytesRead << "/" << fBodyBytesReceived << "/" << fBodyBytesTotal << std::endl;
@ -997,16 +982,11 @@ BHttpSession::Request::ReceiveResult()
// Process the incoming data and write to body
if (bytesRead > 0) {
if (fDecompressingStream) {
auto status = fDecompressingStream->WriteExactly(std::addressof(*bufferStart),
bytesRead);
if (status != B_OK) {
throw BNetworkRequestError("BZlibDecompressionStream::WriteExactly()",
BNetworkRequestError::SystemError, status);
}
fBuffer.WriteExactlyTo(fDecompressingStream.get());
if (receiveEnd) {
// No more bytes expected so flush out the final bytes
if (auto s = fDecompressingStream->Flush(); s != B_OK)
if (auto status = fDecompressingStream->Flush(); status != B_OK)
throw BNetworkRequestError("BZlibDecompressionStream::Flush()",
BNetworkRequestError::SystemError, status);
}
@ -1016,10 +996,10 @@ BHttpSession::Request::ReceiveResult()
fResult->WriteToBody(fDecompressorStorage->Buffer(), bodySize);
fDecompressorStorage->Seek(0, SEEK_SET);
}
bufferStart = fInputBuffer.cend();
} else {
fResult->WriteToBody(std::addressof(*bufferStart), bytesRead);
bufferStart = fInputBuffer.cend();
fBuffer.WriteTo([this](const std::byte* buffer, size_t size) {
return fResult->WriteToBody(buffer, size);
});
}
}
@ -1050,17 +1030,6 @@ BHttpSession::Request::ReceiveResult()
throw BRuntimeError(__PRETTY_FUNCTION__, "To do");
}
// Check if there are any remaining bytes in the inputbuffer
if (bufferStart != fInputBuffer.cend()) {
// move those bytes to the beginning and resize
auto bytesleft = std::distance(bufferStart, fInputBuffer.cend());
std::cout << "ReceiveResult() [" << Id() << "] fInputBuffer bytes left at end: " << bytesleft << std::endl;
std::copy(bufferStart, fInputBuffer.cend(), fInputBuffer.begin());
fInputBuffer.resize(bytesleft);
} else {
fInputBuffer.resize(0);
}
// There is more to receive
return false;
}
@ -1079,25 +1048,6 @@ BHttpSession::Request::Disconnect() noexcept
}
/*!
\brief Get a line from the input buffer
If succesful, the offset will be updated to the next byte after the line
*/
std::optional<BString>
BHttpSession::Request::_GetLine(std::vector<std::byte>::const_iterator& offset)
{
auto result = std::search(offset, fInputBuffer.cend(), kNewLine.cbegin(), kNewLine.cend());
if (result == fInputBuffer.cend())
return std::nullopt;
BString line(reinterpret_cast<const char*>(std::addressof(*offset)), std::distance(offset, result));
offset = result + 2;
std::cout << "_GetLine() [" << Id() << "] " << line << std::endl;
return line;
}
/*!
\brief Parse a HTTP status line, and return a BHttpStatus object on success

View File

@ -8,16 +8,35 @@
#include <HttpStream.h>
#include <algorithm>
#include <optional>
#include <DataIO.h>
#include <HttpRequest.h>
#include <NetServicesDefs.h>
using namespace BPrivate::Network;
// Buffer size constant
constexpr std::size_t kBufferSize = 64 * 1024;
/*!
\brief Size of the internal buffer for reads/writes
Curl 7.82.0 sets the default to 512 kB (524288 bytes)
https://github.com/curl/curl/blob/64db5c575d9c5536bd273a890f50777ad1ca7c13/include/curl/curl.h#L232
Libsoup sets it to 8 kB, though the buffer may grow beyond that if there are leftover bytes.
The absolute maximum seems to be 64 kB (HEADER_SIZE_LIMIT)
https://gitlab.gnome.org/GNOME/libsoup/-/blob/master/libsoup/http1/soup-client-message-io-http1.c#L58
The previous iteration set it to 4 kB, though the input buffer would dynamically grow.
*/
static constexpr ssize_t kMaxBufferSize = 8192;
/*!
\brief Newline sequence
As per the RFC, defined as \r\n
*/
static constexpr std::array<std::byte, 2> kNewLine = {std::byte('\r'), std::byte('\n')};
// #pragma mark -- ByteIOHelper base class
@ -53,7 +72,7 @@ ByteIOHelper::Read(void* buffer, size_t size)
ssize_t
ByteIOHelper::Write(const void* buffer, size_t size)
{
auto remainingSize = kBufferSize - fBuffer.size();
auto remainingSize = kMaxBufferSize - fBuffer.size();
if (remainingSize < 0)
return 0;
@ -81,7 +100,7 @@ ssize_t
BAbstractDataStream::BufferData(BDataIO* source, size_t maxSize)
{
auto currentSize = fBuffer.size();
auto remainingSize = kBufferSize - currentSize;
auto remainingSize = kMaxBufferSize - currentSize;
if (remainingSize < 0)
return B_OK;
@ -138,7 +157,7 @@ BHttpRequestStream::Transfer(BDataIO* target)
return TransferInfo{0, fTotalBodySize, fTotalBodySize, true};
}
if (fBody != nullptr && fBuffer.size() < kBufferSize) {
if (fBody != nullptr && fBuffer.size() < kMaxBufferSize) {
// buffer additional data from the body in the buffer
auto remainingBodySize = fTotalBodySize - fBufferedBodySize;
auto bufferedSize = BufferData(fBody, remainingBodySize);
@ -194,3 +213,155 @@ BHttpRequestStream::Transfer(BDataIO* target)
auto complete = fRemainingHeaderSize == 0 && fTransferredBodySize == fTotalBodySize;
return TransferInfo{bytesWritten, fTransferredBodySize, fTotalBodySize, complete};
}
/*!
\brief Create a new HTTP buffer with \a capacity.
*/
HttpBuffer::HttpBuffer(size_t capacity)
{
fBuffer.reserve(capacity);
};
/*!
\brief Load data from \a source into the spare capacity of this buffer.
\exception BNetworkRequestError When BDataIO::Read() returns any error other than B_WOULD_BLOCK
\retval B_WOULD_BLOCK The read call on the \a source was unsuccessful because it would block.
\retval >=0 The actual number of bytes read.
*/
ssize_t
HttpBuffer::ReadFrom(BDataIO* source)
{
// Remove any unused bytes at the beginning of the buffer
Flush();
auto currentSize = fBuffer.size();
auto remainingBufferSize = fBuffer.capacity() - currentSize;
// Adjust the buffer to the maximum size
fBuffer.resize(fBuffer.capacity());
ssize_t bytesRead = B_INTERRUPTED;
while (bytesRead == B_INTERRUPTED)
bytesRead = source->Read(fBuffer.data() + currentSize, remainingBufferSize);
if (bytesRead == B_WOULD_BLOCK || bytesRead == 0) {
fBuffer.resize(currentSize);
return bytesRead;
} else if (bytesRead < 0) {
throw BNetworkRequestError("BDataIO::Read()", BNetworkRequestError::NetworkError,
bytesRead);
}
// Adjust the buffer to the current size
fBuffer.resize(currentSize + bytesRead);
return bytesRead;
}
/*!
\brief Use BDataIO::WriteExactly() on target to write the contents of the buffer.
*/
void
HttpBuffer::WriteExactlyTo(BDataIO* target)
{
if (RemainingBytes() == 0)
return;
auto status = target->WriteExactly(fBuffer.data() + fCurrentOffset, RemainingBytes());
if (status != B_OK) {
throw BNetworkRequestError("BDataIO::WriteExactly()", BNetworkRequestError::SystemError,
status);
}
// Entire buffer is written; reset internal buffer
Clear();
}
/*!
\brief Write the contents of the buffer through the helper \a func.
\param func Handle the actual writing. The function accepts a pointer and a size as inputs
and should return the number of actual written bytes, which may be fewer than the number
of available bytes.
*/
void
HttpBuffer::WriteTo(WriteFunction func)
{
if (RemainingBytes() == 0)
return;
auto bytesWritten = func(fBuffer.data() + fCurrentOffset, RemainingBytes());
if (bytesWritten > RemainingBytes())
throw BRuntimeError(__PRETTY_FUNCTION__, "More bytes written than were remaining");
fCurrentOffset += bytesWritten;
}
/*!
\brief Get the next line from this buffer.
This can be called iteratively until all lines in the current data are read. After using this
method, you should use Flush() to make sure that the read lines are cleared from the beginning
of the buffer.
\retval std::nullopt There are no more lines in the buffer.
\retval BString The next line.
*/
std::optional<BString>
HttpBuffer::GetNextLine()
{
auto offset = fBuffer.cbegin() + fCurrentOffset;
auto result = std::search(offset, fBuffer.cend(), kNewLine.cbegin(), kNewLine.cend());
if (result == fBuffer.cend())
return std::nullopt;
BString line(reinterpret_cast<const char*>(std::addressof(*offset)), std::distance(offset, result));
fCurrentOffset = std::distance(fBuffer.cbegin(), result) + 2;
return line;
}
/*!
\brief Get the number of remaining bytes in this buffer.
*/
size_t
HttpBuffer::RemainingBytes() noexcept
{
return fBuffer.size() - fCurrentOffset;
}
/*!
\brief Move data to the beginning of the buffer to clear at the back.
The GetNextLine() increases the offset of the internal buffer. This call moves remaining data
to the beginning of the buffer sets the correct size, making the remainder of the capacity
available for further reading.
*/
void
HttpBuffer::Flush() noexcept
{
if (fCurrentOffset > 0) {
auto end = fBuffer.cbegin() + fCurrentOffset;
fBuffer.erase(fBuffer.cbegin(), end);
fCurrentOffset = 0;
}
}
/*!
\brief Clear the internal buffer
*/
void
HttpBuffer::Clear() noexcept
{
fBuffer.clear();
fCurrentOffset = 0;
}