Fixed the write behavior: Blocking writes should write what they can

and loop until everything has been written. Non-blocking writes should
write as much as they can and return B_WOULD_BLOCK, if that wasn't the
whole request.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@25122 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-04-24 02:23:24 +00:00
parent 5843f17cb8
commit cfb0e47367
2 changed files with 141 additions and 35 deletions

View File

@ -197,18 +197,21 @@ TRACEBQ_ONLY(ParanoiaReadCheck(*_buffer));
status_t
UnixBufferQueue::Write(net_buffer* buffer)
UnixBufferQueue::Write(net_buffer* buffer, size_t maxSize)
{
TRACEBQ("unix: UnixBufferQueue::Write(%lu): fSize: %lu, fRead: %lld, "
"fWritten: %lld", buffer->size, fSize, fRead, fWritten);
TRACEBQ("unix: UnixBufferQueue::Write(%lu/%lu): fSize: %lu, fRead: %lld, "
"fWritten: %lld", buffer->size, maxSize, fSize, fRead, fWritten);
TRACEBQ_ONLY(
MethodDeleter<UnixBufferQueue> _(this, &UnixBufferQueue::PostReadWrite);
)
if (buffer->size > Writable())
return ENOBUFS;
maxSize = min_c(buffer->size, maxSize);
if (maxSize > Writable())
RETURN_ERROR(ENOBUFS);
// If we shall write the complete buffer, things are easy.
if (maxSize == buffer->size) {
fBuffers.Add(buffer);
fSize += buffer->size;
TRACEBQ_ONLY(fWritten += buffer->size);
@ -216,6 +219,50 @@ UnixBufferQueue::Write(net_buffer* buffer)
return B_OK;
}
// We shall write only a partial buffer. We need to create a new one and
// cut of the head of the old one.
// TODO: This implementation obviously sucks, but we can't use the split method,
// since it would split off the wrong buffer. The socket module requires us
// to cut off the head of the given one.
// create a temporary buffer
void* tmpBuffer = malloc(maxSize);
if (tmpBuffer == NULL)
return B_OK;
MemoryDeleter tmpBufferDeleter(tmpBuffer);
// read the data to append into the temporary buffer
status_t error = gBufferModule->read(buffer, 0, tmpBuffer, maxSize);
if (error != B_OK)
return error;
// create the new buffer and append the data
net_buffer* newBuffer = gBufferModule->create(256);
if (newBuffer == NULL)
return ENOBUFS;
error = gBufferModule->append(newBuffer, tmpBuffer, maxSize);
// remove the header from the old buffer
if (error == B_OK)
error = gBufferModule->remove_header(buffer, maxSize);
if (error != B_OK) {
gBufferModule->free(newBuffer);
return error;
}
// transfer the ancillary data
gBufferModule->transfer_ancillary_data(buffer, newBuffer);
// Everything went fine. Append the new buffer.
fBuffers.Add(newBuffer);
fSize += newBuffer->size;
TRACEBQ_ONLY(fWritten += newBuffer->size);
return B_OK;
}
void
UnixBufferQueue::SetCapacity(size_t capacity)
@ -364,11 +411,15 @@ UnixFifo::Write(net_buffer* buffer, bigtime_t timeout)
if (IsWriteShutdown())
return UNIX_FIFO_SHUTDOWN;
if (IsReadShutdown())
return EPIPE;
Request request(buffer->size);
fWriters.Add(&request);
fWriteRequested += request.size;
size_t bytesWritten = 0;
status_t error = _Write(request, buffer, timeout);
status_t error = _Write(request, buffer, timeout, bytesWritten);
bool firstInQueue = fWriters.Head() == &request;
fWriters.Remove(&request);
@ -381,9 +432,9 @@ UnixFifo::Write(net_buffer* buffer, bigtime_t timeout)
fWriteCondition.NotifyAll();
}
if (error == B_OK && request.size > 0 && !fReaders.IsEmpty()
if (bytesWritten > 0 && request.size > 0 && !fReaders.IsEmpty()
&& !IsReadShutdown()) {
// We've written something and there are readers. Notify them
// We've written something and there are readers. Notify them.
fReadCondition.NotifyAll();
}
@ -490,12 +541,13 @@ UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
status_t
UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout,
size_t& bytesWritten)
{
// wait for the request to reach the front of the queue
if (fWriters.Head() != &request && timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
if (timeout == 0)
RETURN_ERROR(_WriteNonBlocking(request, buffer, bytesWritten));
// wait for the request to reach the front of the queue
while (fWriters.Head() != &request && !IsWriteShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry, B_CAN_INTERRUPT);
@ -514,17 +566,21 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
if (IsReadShutdown())
return EPIPE;
// wait for any space to become available
if (fBuffer.Writable() < request.size && timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
if (request.size == 0)
return B_OK;
while (fBuffer.Writable() < request.size && !IsWriteShutdown()
status_t error = B_OK;
size_t bytesLeft = buffer->size;
while (error == B_OK && bytesLeft > 0) {
// wait for any space to become available
while (error == B_OK && fBuffer.Writable() == 0 && !IsWriteShutdown()
&& !IsReadShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry, B_CAN_INTERRUPT);
benaphore_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
benaphore_lock(&fLock);
if (error != B_OK)
@ -537,5 +593,52 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
if (IsReadShutdown())
return EPIPE;
RETURN_ERROR(fBuffer.Write(buffer));
// write as much as we can
size_t toWrite = min_c(fBuffer.Writable(), bytesLeft);
error = fBuffer.Write(buffer, toWrite);
if (error == B_OK) {
// TODO: Whenever we've successfully written a part, we should reset the
// timeout!
bytesWritten += toWrite;
bytesLeft -= toWrite;
}
}
RETURN_ERROR(error);
}
status_t
UnixFifo::_WriteNonBlocking(Request& request, net_buffer* buffer,
size_t& bytesWritten)
{
// We need to be first in queue and space should be available right now,
// otherwise we need to fail.
if (fWriters.Head() != &request || fBuffer.Writable() == 0)
RETURN_ERROR(B_WOULD_BLOCK);
if (request.size == 0)
return B_OK;
// Write as much as we can.
size_t toWrite = min_c(fBuffer.Writable(), buffer->size);
status_t error;
if (buffer->size <= fBuffer.Writable()) {
// enough space available
error = fBuffer.Write(buffer, toWrite);
if (error == B_OK)
bytesWritten = toWrite;
} else {
// not enough space available -- write what we can, but return
// B_WOULD_BLOCK nevertheless
error = fBuffer.Write(buffer,toWrite);
if (error == B_OK) {
bytesWritten = toWrite;
error = B_WOULD_BLOCK;
}
}
RETURN_ERROR(error);
}

View File

@ -38,7 +38,7 @@ public:
{ return fCapacity >= fSize ? fCapacity - fSize : 0; }
status_t Read(size_t size, net_buffer** _buffer);
status_t Write(net_buffer* buffer);
status_t Write(net_buffer* buffer, size_t maxSize);
size_t Capacity() const { return fCapacity; }
void SetCapacity(size_t capacity);
@ -118,7 +118,10 @@ private:
private:
status_t _Read(Request& request, size_t numBytes, bigtime_t timeout,
net_buffer** _buffer);
status_t _Write(Request& request, net_buffer* buffer, bigtime_t timeout);
status_t _Write(Request& request, net_buffer* buffer, bigtime_t timeout,
size_t& bytesWritten);
status_t _WriteNonBlocking(Request& request, net_buffer* buffer,
size_t& bytesWritten);
private:
benaphore fLock;