BAdapterIO: Add initial seeking support
* This is by default provided using a relative buffer. When the client request a seek over the range we currently have, we will ask the backend with a SeekRequested. When the backend operation ended successfully, the client is required to call SeekCompleted before to return. This will cause the offset to change and the buffer is cleaned as result. Next data received will be considered to be at a position equal to the offset plus the relative position.
This commit is contained in:
parent
0b20cac7c3
commit
3faf39eb5d
@ -13,6 +13,7 @@
|
||||
|
||||
|
||||
class BAdapterIO;
|
||||
class RelativePositionIO;
|
||||
|
||||
|
||||
class BInputAdapter {
|
||||
@ -62,15 +63,19 @@ protected:
|
||||
|
||||
ssize_t BackWrite(const void* buffer, size_t size);
|
||||
|
||||
status_t SeekRequested(off_t position);
|
||||
status_t SeekCompleted(off_t position);
|
||||
|
||||
private:
|
||||
void _WaitForData(size_t size);
|
||||
status_t _EvaluateWait(off_t position);
|
||||
|
||||
int32 fFlags;
|
||||
bigtime_t fTimeout;
|
||||
|
||||
off_t fBackPosition;
|
||||
mutable RWLocker fLock;
|
||||
BPositionIO* fBuffer;
|
||||
|
||||
RelativePositionIO* fBuffer;
|
||||
|
||||
BInputAdapter* fInputAdapter;
|
||||
|
||||
|
@ -11,6 +11,146 @@
|
||||
#include <stdio.h>
|
||||
|
||||
|
||||
class RelativePositionIO : public BPositionIO
|
||||
{
|
||||
public:
|
||||
RelativePositionIO(BPositionIO* buffer)
|
||||
:
|
||||
BPositionIO(),
|
||||
fStartOffset(0),
|
||||
fTotalSize(0),
|
||||
fBuffer(buffer)
|
||||
{}
|
||||
|
||||
virtual ~RelativePositionIO()
|
||||
{
|
||||
delete fBuffer;
|
||||
}
|
||||
|
||||
void SetTotalSize(size_t size)
|
||||
{
|
||||
fTotalSize = size;
|
||||
}
|
||||
|
||||
size_t TotalSize() const
|
||||
{
|
||||
return fTotalSize;
|
||||
}
|
||||
|
||||
status_t ResetStartOffset(off_t offset)
|
||||
{
|
||||
status_t ret = fBuffer->SetSize(0);
|
||||
if (ret == B_OK)
|
||||
fStartOffset = offset;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
status_t EvaluatePosition(off_t position)
|
||||
{
|
||||
if (position < 0)
|
||||
return B_ERROR;
|
||||
|
||||
if (position < fStartOffset)
|
||||
return B_RESOURCE_UNAVAILABLE;
|
||||
|
||||
// This is an endless stream, we don't know
|
||||
// how much data will come and when, we could
|
||||
// block on that.
|
||||
if (fTotalSize == 0)
|
||||
return B_WOULD_BLOCK;
|
||||
|
||||
if (position >= fTotalSize)
|
||||
return B_ERROR;
|
||||
|
||||
off_t size = 0;
|
||||
fBuffer->GetSize(&size);
|
||||
if (position >= size)
|
||||
return B_RESOURCE_UNAVAILABLE;
|
||||
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
status_t WaitForData(off_t position)
|
||||
{
|
||||
off_t bufferSize = 0;
|
||||
position = _PositionToRelative(position);
|
||||
|
||||
status_t ret = fBuffer->GetSize(&bufferSize);
|
||||
if (ret != B_OK)
|
||||
return B_ERROR;
|
||||
|
||||
while(bufferSize < position) {
|
||||
snooze(100000);
|
||||
fBuffer->GetSize(&bufferSize);
|
||||
}
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
virtual ssize_t ReadAt(off_t position, void* buffer,
|
||||
size_t size)
|
||||
{
|
||||
return fBuffer->ReadAt(
|
||||
_PositionToRelative(position), buffer, size);
|
||||
|
||||
}
|
||||
|
||||
virtual ssize_t WriteAt(off_t position,
|
||||
const void* buffer, size_t size)
|
||||
{
|
||||
return fBuffer->WriteAt(
|
||||
_PositionToRelative(position), buffer, size);
|
||||
}
|
||||
|
||||
virtual off_t Seek(off_t position, uint32 seekMode)
|
||||
{
|
||||
return fBuffer->Seek(_PositionToRelative(position), seekMode);
|
||||
}
|
||||
|
||||
virtual off_t Position() const
|
||||
{
|
||||
return _RelativeToPosition(fBuffer->Position());
|
||||
}
|
||||
|
||||
virtual status_t SetSize(off_t size)
|
||||
{
|
||||
return fBuffer->SetSize(_PositionToRelative(size));
|
||||
}
|
||||
|
||||
virtual status_t GetSize(off_t* size) const
|
||||
{
|
||||
if (fTotalSize > 0) {
|
||||
*size = fTotalSize;
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
off_t bufferSize;
|
||||
status_t ret = fBuffer->GetSize(&bufferSize);
|
||||
if (ret == B_OK)
|
||||
*size = _RelativeToPosition(bufferSize);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
off_t _PositionToRelative(off_t position) const
|
||||
{
|
||||
return position - fStartOffset;
|
||||
}
|
||||
|
||||
off_t _RelativeToPosition(off_t position) const
|
||||
{
|
||||
return position + fStartOffset;
|
||||
}
|
||||
|
||||
off_t fStartOffset;
|
||||
off_t fTotalSize;
|
||||
|
||||
BPositionIO* fBuffer;
|
||||
};
|
||||
|
||||
|
||||
BAdapterIO::BAdapterIO(int32 flags, bigtime_t timeout)
|
||||
:
|
||||
fFlags(flags),
|
||||
@ -19,7 +159,7 @@ BAdapterIO::BAdapterIO(int32 flags, bigtime_t timeout)
|
||||
fBuffer(NULL),
|
||||
fInputAdapter(NULL)
|
||||
{
|
||||
fBuffer = new BMallocIO();
|
||||
fBuffer = new RelativePositionIO(new BMallocIO());
|
||||
}
|
||||
|
||||
|
||||
@ -47,7 +187,10 @@ ssize_t
|
||||
BAdapterIO::ReadAt(off_t position, void* buffer, size_t size)
|
||||
{
|
||||
printf("read at %d %d \n", (int)position, (int)size);
|
||||
_WaitForData(position+size);
|
||||
status_t ret = _EvaluateWait(position+size);
|
||||
if (ret != B_OK)
|
||||
return ret;
|
||||
|
||||
AutoReadLocker _(fLock);
|
||||
|
||||
return fBuffer->ReadAt(position, buffer, size);
|
||||
@ -57,7 +200,10 @@ BAdapterIO::ReadAt(off_t position, void* buffer, size_t size)
|
||||
ssize_t
|
||||
BAdapterIO::WriteAt(off_t position, const void* buffer, size_t size)
|
||||
{
|
||||
_WaitForData(position+size);
|
||||
status_t ret = _EvaluateWait(position+size);
|
||||
if (ret != B_OK)
|
||||
return ret;
|
||||
|
||||
AutoWriteLocker _(fLock);
|
||||
|
||||
return fBuffer->WriteAt(position, buffer, size);
|
||||
@ -67,7 +213,10 @@ BAdapterIO::WriteAt(off_t position, const void* buffer, size_t size)
|
||||
off_t
|
||||
BAdapterIO::Seek(off_t position, uint32 seekMode)
|
||||
{
|
||||
_WaitForData(position);
|
||||
status_t ret = _EvaluateWait(position);
|
||||
if (ret != B_OK)
|
||||
return ret;
|
||||
|
||||
AutoWriteLocker _(fLock);
|
||||
|
||||
return fBuffer->Seek(position, seekMode);
|
||||
@ -101,6 +250,21 @@ BAdapterIO::GetSize(off_t* size) const
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
BAdapterIO::_EvaluateWait(off_t pos)
|
||||
{
|
||||
status_t err = fBuffer->EvaluatePosition(pos);
|
||||
if (err == B_ERROR && err != B_WOULD_BLOCK)
|
||||
return B_ERROR;
|
||||
else if (err == B_RESOURCE_UNAVAILABLE) {
|
||||
if (SeekRequested(pos) != B_OK)
|
||||
return B_UNSUPPORTED;
|
||||
}
|
||||
|
||||
return fBuffer->WaitForData(pos);
|
||||
}
|
||||
|
||||
|
||||
BInputAdapter*
|
||||
BAdapterIO::BuildInputAdapter()
|
||||
{
|
||||
@ -124,19 +288,17 @@ BAdapterIO::BackWrite(const void* buffer, size_t size)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
BAdapterIO::_WaitForData(size_t size)
|
||||
status_t
|
||||
BAdapterIO::SeekRequested(off_t position)
|
||||
{
|
||||
off_t bufferSize = 0;
|
||||
|
||||
status_t ret = GetSize(&bufferSize);
|
||||
if (ret != B_OK)
|
||||
return;
|
||||
|
||||
while((size_t)bufferSize < size) {
|
||||
GetSize(&bufferSize);
|
||||
snooze(100000);
|
||||
return B_ERROR;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
BAdapterIO::SeekCompleted(off_t position)
|
||||
{
|
||||
return fBuffer->ResetStartOffset(position);
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user