qemu/tests/migration/guestperf/engine.py
Hyman f9cc00346d tests/migration: fix unix socket batch migration
when execute the following test command:
"guestperf-batch.py --dst-host localhost --transport unix ..."
test aborts and error message as the following be throwed:
"launching VM Failed: [Errno 98] Address already in use".

The reason is that batch script use the same monitor socket
in all test cases and do not remove the socket file. The second
migration test will launch vm use the same socket file as
the first, so we get the error message. To fix it, just remove
the socket file each time we have done the migration test.

Signed-off-by: Hyman <huangy81@chinatelecom.cn>
Message-Id: <c3fc438993b87a6ab0bea3d07f6ca0260d29936e.1615397103.git.huangy81@chinatelecom.cn>
Reviewed-by: Philippe Mathieu-Daudé <philmd@redhat.com>
Signed-off-by: Cleber Rosa <crosa@redhat.com>
2021-03-16 21:08:37 -04:00

446 lines
16 KiB
Python

#
# Migration test main engine
#
# Copyright (c) 2016 Red Hat, Inc.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <http://www.gnu.org/licenses/>.
#
import os
import re
import sys
import time
from guestperf.progress import Progress, ProgressStats
from guestperf.report import Report
from guestperf.timings import TimingRecord, Timings
sys.path.append(os.path.join(os.path.dirname(__file__),
'..', '..', '..', 'python'))
from qemu.machine import QEMUMachine
class Engine(object):
def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
sleep=15, verbose=False, debug=False):
self._binary = binary # Path to QEMU binary
self._dst_host = dst_host # Hostname of target host
self._kernel = kernel # Path to kernel image
self._initrd = initrd # Path to stress initrd
self._transport = transport # 'unix' or 'tcp' or 'rdma'
self._sleep = sleep
self._verbose = verbose
self._debug = debug
if debug:
self._verbose = debug
def _vcpu_timing(self, pid, tid_list):
records = []
now = time.time()
jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
for tid in tid_list:
statfile = "/proc/%d/task/%d/stat" % (pid, tid)
with open(statfile, "r") as fh:
stat = fh.readline()
fields = stat.split(" ")
stime = int(fields[13])
utime = int(fields[14])
records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
return records
def _cpu_timing(self, pid):
records = []
now = time.time()
jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
statfile = "/proc/%d/stat" % pid
with open(statfile, "r") as fh:
stat = fh.readline()
fields = stat.split(" ")
stime = int(fields[13])
utime = int(fields[14])
return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
def _migrate_progress(self, vm):
info = vm.command("query-migrate")
if "ram" not in info:
info["ram"] = {}
return Progress(
info.get("status", "active"),
ProgressStats(
info["ram"].get("transferred", 0),
info["ram"].get("remaining", 0),
info["ram"].get("total", 0),
info["ram"].get("duplicate", 0),
info["ram"].get("skipped", 0),
info["ram"].get("normal", 0),
info["ram"].get("normal-bytes", 0),
info["ram"].get("dirty-pages-rate", 0),
info["ram"].get("mbps", 0),
info["ram"].get("dirty-sync-count", 0)
),
time.time(),
info.get("total-time", 0),
info.get("downtime", 0),
info.get("expected-downtime", 0),
info.get("setup-time", 0),
info.get("x-cpu-throttle-percentage", 0),
)
def _migrate(self, hardware, scenario, src, dst, connect_uri):
src_qemu_time = []
src_vcpu_time = []
src_pid = src.get_pid()
vcpus = src.command("query-cpus")
src_threads = []
for vcpu in vcpus:
src_threads.append(vcpu["thread_id"])
# XXX how to get dst timings on remote host ?
if self._verbose:
print("Sleeping %d seconds for initial guest workload run" % self._sleep)
sleep_secs = self._sleep
while sleep_secs > 1:
src_qemu_time.append(self._cpu_timing(src_pid))
src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
time.sleep(1)
sleep_secs -= 1
if self._verbose:
print("Starting migration")
if scenario._auto_converge:
resp = src.command("migrate-set-capabilities",
capabilities = [
{ "capability": "auto-converge",
"state": True }
])
resp = src.command("migrate-set-parameters",
x_cpu_throttle_increment=scenario._auto_converge_step)
if scenario._post_copy:
resp = src.command("migrate-set-capabilities",
capabilities = [
{ "capability": "postcopy-ram",
"state": True }
])
resp = dst.command("migrate-set-capabilities",
capabilities = [
{ "capability": "postcopy-ram",
"state": True }
])
resp = src.command("migrate_set_speed",
value=scenario._bandwidth * 1024 * 1024)
resp = src.command("migrate_set_downtime",
value=scenario._downtime / 1024.0)
if scenario._compression_mt:
resp = src.command("migrate-set-capabilities",
capabilities = [
{ "capability": "compress",
"state": True }
])
resp = src.command("migrate-set-parameters",
compress_threads=scenario._compression_mt_threads)
resp = dst.command("migrate-set-capabilities",
capabilities = [
{ "capability": "compress",
"state": True }
])
resp = dst.command("migrate-set-parameters",
decompress_threads=scenario._compression_mt_threads)
if scenario._compression_xbzrle:
resp = src.command("migrate-set-capabilities",
capabilities = [
{ "capability": "xbzrle",
"state": True }
])
resp = dst.command("migrate-set-capabilities",
capabilities = [
{ "capability": "xbzrle",
"state": True }
])
resp = src.command("migrate-set-cache-size",
value=(hardware._mem * 1024 * 1024 * 1024 / 100 *
scenario._compression_xbzrle_cache))
resp = src.command("migrate", uri=connect_uri)
post_copy = False
paused = False
progress_history = []
start = time.time()
loop = 0
while True:
loop = loop + 1
time.sleep(0.05)
progress = self._migrate_progress(src)
if (loop % 20) == 0:
src_qemu_time.append(self._cpu_timing(src_pid))
src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
if (len(progress_history) == 0 or
(progress_history[-1]._ram._iterations <
progress._ram._iterations)):
progress_history.append(progress)
if progress._status in ("completed", "failed", "cancelled"):
if progress._status == "completed" and paused:
dst.command("cont")
if progress_history[-1] != progress:
progress_history.append(progress)
if progress._status == "completed":
if self._verbose:
print("Sleeping %d seconds for final guest workload run" % self._sleep)
sleep_secs = self._sleep
while sleep_secs > 1:
time.sleep(1)
src_qemu_time.append(self._cpu_timing(src_pid))
src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
sleep_secs -= 1
return [progress_history, src_qemu_time, src_vcpu_time]
if self._verbose and (loop % 20) == 0:
print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
progress._ram._iterations,
progress._ram._remaining_bytes / (1024 * 1024),
progress._ram._total_bytes / (1024 * 1024),
progress._ram._transferred_bytes / (1024 * 1024),
progress._ram._transfer_rate_mbs,
))
if progress._ram._iterations > scenario._max_iters:
if self._verbose:
print("No completion after %d iterations over RAM" % scenario._max_iters)
src.command("migrate_cancel")
continue
if time.time() > (start + scenario._max_time):
if self._verbose:
print("No completion after %d seconds" % scenario._max_time)
src.command("migrate_cancel")
continue
if (scenario._post_copy and
progress._ram._iterations >= scenario._post_copy_iters and
not post_copy):
if self._verbose:
print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
resp = src.command("migrate-start-postcopy")
post_copy = True
if (scenario._pause and
progress._ram._iterations >= scenario._pause_iters and
not paused):
if self._verbose:
print("Pausing VM after %d iterations" % scenario._pause_iters)
resp = src.command("stop")
paused = True
def _get_common_args(self, hardware, tunnelled=False):
args = [
"noapic",
"edd=off",
"printk.time=1",
"noreplace-smp",
"cgroup_disable=memory",
"pci=noearly",
"console=ttyS0",
]
if self._debug:
args.append("debug")
else:
args.append("quiet")
args.append("ramsize=%s" % hardware._mem)
cmdline = " ".join(args)
if tunnelled:
cmdline = "'" + cmdline + "'"
argv = [
"-accel", "kvm",
"-cpu", "host",
"-kernel", self._kernel,
"-initrd", self._initrd,
"-append", cmdline,
"-chardev", "stdio,id=cdev0",
"-device", "isa-serial,chardev=cdev0",
"-m", str((hardware._mem * 1024) + 512),
"-smp", str(hardware._cpus),
]
if self._debug:
argv.extend(["-device", "sga"])
if hardware._prealloc_pages:
argv_source += ["-mem-path", "/dev/shm",
"-mem-prealloc"]
if hardware._locked_pages:
argv_source += ["-overcommit", "mem-lock=on"]
if hardware._huge_pages:
pass
return argv
def _get_src_args(self, hardware):
return self._get_common_args(hardware)
def _get_dst_args(self, hardware, uri):
tunnelled = False
if self._dst_host != "localhost":
tunnelled = True
argv = self._get_common_args(hardware, tunnelled)
return argv + ["-incoming", uri]
@staticmethod
def _get_common_wrapper(cpu_bind, mem_bind):
wrapper = []
if len(cpu_bind) > 0 or len(mem_bind) > 0:
wrapper.append("numactl")
if cpu_bind:
wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
if mem_bind:
wrapper.append("--membind=%s" % ",".join(mem_bind))
return wrapper
def _get_src_wrapper(self, hardware):
return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
def _get_dst_wrapper(self, hardware):
wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
if self._dst_host != "localhost":
return ["ssh",
"-R", "9001:localhost:9001",
self._dst_host] + wrapper
else:
return wrapper
def _get_timings(self, vm):
log = vm.get_log()
if not log:
return []
if self._debug:
print(log)
regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
matcher = re.compile(regex)
records = []
for line in log.split("\n"):
match = matcher.match(line)
if match:
records.append(TimingRecord(int(match.group(1)),
int(match.group(2)) / 1000.0,
int(match.group(3))))
return records
def run(self, hardware, scenario, result_dir=os.getcwd()):
abs_result_dir = os.path.join(result_dir, scenario._name)
if self._transport == "tcp":
uri = "tcp:%s:9000" % self._dst_host
elif self._transport == "rdma":
uri = "rdma:%s:9000" % self._dst_host
elif self._transport == "unix":
if self._dst_host != "localhost":
raise Exception("Running use unix migration transport for non-local host")
uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
try:
os.remove(uri[5:])
os.remove(monaddr)
except:
pass
if self._dst_host != "localhost":
dstmonaddr = ("localhost", 9001)
else:
dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
src = QEMUMachine(self._binary,
args=self._get_src_args(hardware),
wrapper=self._get_src_wrapper(hardware),
name="qemu-src-%d" % os.getpid(),
monitor_address=srcmonaddr)
dst = QEMUMachine(self._binary,
args=self._get_dst_args(hardware, uri),
wrapper=self._get_dst_wrapper(hardware),
name="qemu-dst-%d" % os.getpid(),
monitor_address=dstmonaddr)
try:
src.launch()
dst.launch()
ret = self._migrate(hardware, scenario, src, dst, uri)
progress_history = ret[0]
qemu_timings = ret[1]
vcpu_timings = ret[2]
if uri[0:5] == "unix:":
os.remove(uri[5:])
if os.path.exists(srcmonaddr):
os.remove(srcmonaddr)
if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
os.remove(dstmonaddr)
if self._verbose:
print("Finished migration")
src.shutdown()
dst.shutdown()
return Report(hardware, scenario, progress_history,
Timings(self._get_timings(src) + self._get_timings(dst)),
Timings(qemu_timings),
Timings(vcpu_timings),
self._binary, self._dst_host, self._kernel,
self._initrd, self._transport, self._sleep)
except Exception as e:
if self._debug:
print("Failed: %s" % str(e))
try:
src.shutdown()
except:
pass
try:
dst.shutdown()
except:
pass
if self._debug:
print(src.get_log())
print(dst.get_log())
raise