Improve block job rate limiting for small bandwidth values
ratelimit_calculate_delay() previously reset the accounting every time slice, no matter how much data had been processed before. This had (at least) two consequences: 1. The minimum speed is rather large, e.g. 5 MiB/s for commit and stream. Not sure if there are real-world use cases where this would be a problem. Mirroring and backup over a slow link (e.g. DSL) would come to mind, though. 2. Tests for block job operations (e.g. cancel) were rather racy All block jobs currently use a time slice of 100ms. That's a reasonable value to get smooth output during regular operation. However this also meant that the state of block jobs changed every 100ms, no matter how low the configured limit was. On busy hosts, qemu often transferred additional chunks until the test case had a chance to cancel the job. Fix the block job rate limit code to delay for more than one time slice to address the above issues. To make it easier to handle oversized chunks we switch the semantics from returning a delay _before_ the current request to a delay _after_ the current request. If necessary, this delay consists of multiple time slice units. Since the mirror job sends multiple chunks in one go even if the rate limit was exceeded in between, we need to keep track of the start of the current time slice so we can correctly re-compute the delay for the updated amount of data. The minimum bandwidth now is 1 data unit per time slice. The block jobs are currently passing the amount of data transferred in sectors and using 100ms time slices, so this translates to 5120 bytes/second. With chunk sizes usually being O(512KiB), tests have plenty of time (O(100s)) to operate on block jobs. The chance of a race condition now is fairly remote, except possibly on insanely loaded systems. Signed-off-by: Sascha Silbe <silbe@linux.vnet.ibm.com> Message-id: 1467127721-9564-2-git-send-email-silbe@linux.vnet.ibm.com Reviewed-by: Max Reitz <mreitz@redhat.com> Signed-off-by: Max Reitz <mreitz@redhat.com>
This commit is contained in:
parent
c834cba905
commit
f14a39ccb9
@ -113,6 +113,7 @@ static void coroutine_fn commit_run(void *opaque)
|
||||
CommitBlockJob *s = opaque;
|
||||
CommitCompleteData *data;
|
||||
int64_t sector_num, end;
|
||||
uint64_t delay_ns = 0;
|
||||
int ret = 0;
|
||||
int n = 0;
|
||||
void *buf = NULL;
|
||||
@ -142,10 +143,8 @@ static void coroutine_fn commit_run(void *opaque)
|
||||
buf = blk_blockalign(s->top, COMMIT_BUFFER_SIZE);
|
||||
|
||||
for (sector_num = 0; sector_num < end; sector_num += n) {
|
||||
uint64_t delay_ns = 0;
|
||||
bool copy;
|
||||
|
||||
wait:
|
||||
/* Note that even when no rate limit is applied we need to yield
|
||||
* with no pending I/O here so that bdrv_drain_all() returns.
|
||||
*/
|
||||
@ -161,12 +160,6 @@ wait:
|
||||
copy = (ret == 1);
|
||||
trace_commit_one_iteration(s, sector_num, n, ret);
|
||||
if (copy) {
|
||||
if (s->common.speed) {
|
||||
delay_ns = ratelimit_calculate_delay(&s->limit, n);
|
||||
if (delay_ns > 0) {
|
||||
goto wait;
|
||||
}
|
||||
}
|
||||
ret = commit_populate(s->top, s->base, sector_num, n, buf);
|
||||
bytes_written += n * BDRV_SECTOR_SIZE;
|
||||
}
|
||||
@ -182,6 +175,10 @@ wait:
|
||||
}
|
||||
/* Publish progress */
|
||||
s->common.offset += n * BDRV_SECTOR_SIZE;
|
||||
|
||||
if (copy && s->common.speed) {
|
||||
delay_ns = ratelimit_calculate_delay(&s->limit, n);
|
||||
}
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
@ -422,7 +422,9 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
|
||||
assert(io_sectors);
|
||||
sector_num += io_sectors;
|
||||
nb_chunks -= DIV_ROUND_UP(io_sectors, sectors_per_chunk);
|
||||
delay_ns += ratelimit_calculate_delay(&s->limit, io_sectors);
|
||||
if (s->common.speed) {
|
||||
delay_ns = ratelimit_calculate_delay(&s->limit, io_sectors);
|
||||
}
|
||||
}
|
||||
return delay_ns;
|
||||
}
|
||||
|
@ -95,6 +95,7 @@ static void coroutine_fn stream_run(void *opaque)
|
||||
BlockDriverState *base = s->base;
|
||||
int64_t sector_num = 0;
|
||||
int64_t end = -1;
|
||||
uint64_t delay_ns = 0;
|
||||
int error = 0;
|
||||
int ret = 0;
|
||||
int n = 0;
|
||||
@ -123,10 +124,8 @@ static void coroutine_fn stream_run(void *opaque)
|
||||
}
|
||||
|
||||
for (sector_num = 0; sector_num < end; sector_num += n) {
|
||||
uint64_t delay_ns = 0;
|
||||
bool copy;
|
||||
|
||||
wait:
|
||||
/* Note that even when no rate limit is applied we need to yield
|
||||
* with no pending I/O here so that bdrv_drain_all() returns.
|
||||
*/
|
||||
@ -156,12 +155,6 @@ wait:
|
||||
}
|
||||
trace_stream_one_iteration(s, sector_num, n, ret);
|
||||
if (copy) {
|
||||
if (s->common.speed) {
|
||||
delay_ns = ratelimit_calculate_delay(&s->limit, n);
|
||||
if (delay_ns > 0) {
|
||||
goto wait;
|
||||
}
|
||||
}
|
||||
ret = stream_populate(blk, sector_num, n, buf);
|
||||
}
|
||||
if (ret < 0) {
|
||||
@ -182,6 +175,9 @@ wait:
|
||||
|
||||
/* Publish progress */
|
||||
s->common.offset += n * BDRV_SECTOR_SIZE;
|
||||
if (copy && s->common.speed) {
|
||||
delay_ns = ratelimit_calculate_delay(&s->limit, n);
|
||||
}
|
||||
}
|
||||
|
||||
if (!base) {
|
||||
|
@ -15,34 +15,59 @@
|
||||
#define QEMU_RATELIMIT_H
|
||||
|
||||
typedef struct {
|
||||
int64_t next_slice_time;
|
||||
int64_t slice_start_time;
|
||||
int64_t slice_end_time;
|
||||
uint64_t slice_quota;
|
||||
uint64_t slice_ns;
|
||||
uint64_t dispatched;
|
||||
} RateLimit;
|
||||
|
||||
/** Calculate and return delay for next request in ns
|
||||
*
|
||||
* Record that we sent @p n data units. If we may send more data units
|
||||
* in the current time slice, return 0 (i.e. no delay). Otherwise
|
||||
* return the amount of time (in ns) until the start of the next time
|
||||
* slice that will permit sending the next chunk of data.
|
||||
*
|
||||
* Recording sent data units even after exceeding the quota is
|
||||
* permitted; the time slice will be extended accordingly.
|
||||
*/
|
||||
static inline int64_t ratelimit_calculate_delay(RateLimit *limit, uint64_t n)
|
||||
{
|
||||
int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
|
||||
uint64_t delay_slices;
|
||||
|
||||
if (limit->next_slice_time < now) {
|
||||
limit->next_slice_time = now + limit->slice_ns;
|
||||
assert(limit->slice_quota && limit->slice_ns);
|
||||
|
||||
if (limit->slice_end_time < now) {
|
||||
/* Previous, possibly extended, time slice finished; reset the
|
||||
* accounting. */
|
||||
limit->slice_start_time = now;
|
||||
limit->slice_end_time = now + limit->slice_ns;
|
||||
limit->dispatched = 0;
|
||||
}
|
||||
if (limit->dispatched == 0 || limit->dispatched + n <= limit->slice_quota) {
|
||||
limit->dispatched += n;
|
||||
|
||||
limit->dispatched += n;
|
||||
if (limit->dispatched < limit->slice_quota) {
|
||||
/* We may send further data within the current time slice, no
|
||||
* need to delay the next request. */
|
||||
return 0;
|
||||
} else {
|
||||
limit->dispatched = n;
|
||||
return limit->next_slice_time - now;
|
||||
}
|
||||
|
||||
/* Quota exceeded. Calculate the next time slice we may start
|
||||
* sending data again. */
|
||||
delay_slices = (limit->dispatched + limit->slice_quota - 1) /
|
||||
limit->slice_quota;
|
||||
limit->slice_end_time = limit->slice_start_time +
|
||||
delay_slices * limit->slice_ns;
|
||||
return limit->slice_end_time - now;
|
||||
}
|
||||
|
||||
static inline void ratelimit_set_speed(RateLimit *limit, uint64_t speed,
|
||||
uint64_t slice_ns)
|
||||
{
|
||||
limit->slice_ns = slice_ns;
|
||||
limit->slice_quota = ((double)speed * slice_ns)/1000000000ULL;
|
||||
limit->slice_quota = MAX(((double)speed * slice_ns) / 1000000000ULL, 1);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user