qemu/tests/qemu-iotests/207
Hanna Reitz 48f1fcd5c8 iotests/207: Filter host fingerprint
Commit e3296cc796 made the ssh block
driver's error message for fingerprint mismatches more verbose, so it
now prints the actual host key fingerprint and the key type.

iotest 207 tests such errors, but was not amended to filter that
fingerprint (which is host-specific), so do it now.  Filter the key
type, too, because I guess this too can differ depending on the host
configuration.

Fixes: e3296cc796
       ("block: print the server key type and fingerprint on failure")
Reported-by: John Snow <jsnow@redhat.com>
Signed-off-by: Hanna Reitz <hreitz@redhat.com>
Message-Id: <20220318125304.66131-3-hreitz@redhat.com>
Reviewed-by: John Snow <jsnow@redhat.com>
2022-03-22 10:50:10 +01:00

310 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
# group: rw
#
# Test ssh image creation
#
# Copyright (C) 2018 Red Hat, Inc.
#
# Creator/Owner: Kevin Wolf <kwolf@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import iotests
import subprocess
import re
iotests.script_initialize(
supported_fmts=['raw'],
supported_protocols=['ssh'],
)
def filter_hash(qmsg):
def _filter(key, value):
if key == 'hash' and re.match('[0-9a-f]+', value):
return 'HASH'
return value
if isinstance(qmsg, str):
# Strip key type and fingerprint
p = r"\S+ (key fingerprint) '(md5|sha1|sha256):[0-9a-f]+'"
return re.sub(p, r"\1 '\2:HASH'", qmsg)
else:
return iotests.filter_qmp(qmsg, _filter)
def blockdev_create(vm, options):
vm.blockdev_create(options, filters=[iotests.filter_qmp_testfiles, filter_hash])
with iotests.FilePath('t.img') as disk_path, \
iotests.VM() as vm:
remote_path = iotests.remote_filename(disk_path)
#
# Successful image creation (defaults)
#
iotests.log("=== Successful image creation (defaults) ===")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
}
},
'size': 4194304 })
vm.shutdown()
iotests.img_info_log(remote_path)
iotests.log("")
iotests.img_info_log(disk_path)
#
# Test host-key-check options
#
iotests.log("=== Test host-key-check options ===")
iotests.log("")
iotests.log("--- no host key checking --")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'none'
}
},
'size': 8388608 })
vm.shutdown()
iotests.img_info_log(remote_path)
iotests.log("--- known_hosts key checking --")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'known_hosts'
}
},
'size': 4194304 })
vm.shutdown()
iotests.img_info_log(remote_path)
keys = subprocess.check_output(
'ssh-keyscan 127.0.0.1 2>/dev/null | grep -v "\\^#" | ' +
'cut -d" " -f3',
shell=True).rstrip().decode('ascii').split('\n')
# Mappings of base64 representations to digests
md5_keys = {}
sha1_keys = {}
sha256_keys = {}
for key in keys:
md5_keys[key] = subprocess.check_output(
'echo %s | base64 -d | md5sum -b | cut -d" " -f1' % key,
shell=True).rstrip().decode('ascii')
sha1_keys[key] = subprocess.check_output(
'echo %s | base64 -d | sha1sum -b | cut -d" " -f1' % key,
shell=True).rstrip().decode('ascii')
sha256_keys[key] = subprocess.check_output(
'echo %s | base64 -d | sha256sum -b | cut -d" " -f1' % key,
shell=True).rstrip().decode('ascii')
vm.launch()
# Find correct key first
matching_key = None
for key in keys:
result = vm.qmp('blockdev-add',
driver='ssh', node_name='node0', path=disk_path,
server={
'host': '127.0.0.1',
'port': '22',
}, host_key_check={
'mode': 'hash',
'type': 'md5',
'hash': md5_keys[key],
})
if 'error' not in result:
vm.qmp('blockdev-del', node_name='node0')
matching_key = key
break
if matching_key is None:
vm.shutdown()
iotests.notrun('Did not find a key that fits 127.0.0.1')
iotests.log("--- explicit md5 key checking --")
iotests.log("")
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'md5',
'hash': 'wrong',
}
},
'size': 2097152 })
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'md5',
'hash': md5_keys[matching_key],
}
},
'size': 8388608 })
vm.shutdown()
iotests.img_info_log(remote_path)
iotests.log("--- explicit sha1 key checking --")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'sha1',
'hash': 'wrong',
}
},
'size': 2097152 })
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'sha1',
'hash': sha1_keys[matching_key],
}
},
'size': 4194304 })
vm.shutdown()
iotests.img_info_log(remote_path)
iotests.log("--- explicit sha256 key checking --")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'sha256',
'hash': 'wrong',
}
},
'size': 2097152 })
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'hash',
'type': 'sha256',
'hash': sha256_keys[matching_key],
}
},
'size': 4194304 })
vm.shutdown()
iotests.img_info_log(remote_path)
#
# Invalid path and user
#
iotests.log("=== Invalid path and user ===")
iotests.log("")
vm.launch()
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': '/this/is/not/an/existing/path',
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'none'
}
},
'size': 4194304 })
blockdev_create(vm, { 'driver': 'ssh',
'location': {
'path': disk_path,
'user': 'invalid user',
'server': {
'host': '127.0.0.1',
'port': '22'
},
'host-key-check': {
'mode': 'none'
}
},
'size': 4194304 })
vm.shutdown()