diff --git a/bindings/python/sample_all.sh b/bindings/python/sample_all.sh index f4e7a55c..a5e27809 100755 --- a/bindings/python/sample_all.sh +++ b/bindings/python/sample_all.sh @@ -1,15 +1,29 @@ #!/bin/sh -./sample_x86.py +python3 ./sample_arm.py echo "==========================" -./shellcode.py +python3 ./sample_armeb.py echo "==========================" -./sample_arm.py +python3 ./sample_arm64.py echo "==========================" -./sample_arm64.py +python3 ./sample_arm64eb.py echo "==========================" -./sample_mips.py +python3 ./sample_m68k.py echo "==========================" -./sample_sparc.py +python3 ./sample_mips.py echo "==========================" -./sample_m68k.py +python3 ./sample_ppc.py +echo "==========================" +python3 ./sample_riscv.py +echo "==========================" +python3 ./sample_s390x.py +echo "==========================" +python3 ./sample_sparc.py +echo "==========================" +python3 ./sample_tricore.py +echo "==========================" +python3 ./sample_x86.py +echo "==========================" +python3 ./shellcode.py +echo "==========================" +python3 ./sample_ctl.py diff --git a/bindings/python/sample_ctl.py b/bindings/python/sample_ctl.py index 1578a010..c1cbc070 100755 --- a/bindings/python/sample_ctl.py +++ b/bindings/python/sample_ctl.py @@ -3,7 +3,6 @@ # By Lazymio(@wtdcode), 2021 from unicorn import * -from unicorn.unicorn import UC_HOOK_EDGE_GEN_CB from unicorn.x86_const import * from datetime import datetime @@ -58,7 +57,7 @@ def test_uc_ctl_tb_cache(): # Now we clear cache for all TBs. for i in range(8): uc.ctl_remove_cache(addr + i * 512, addr + i * 512 + 1) - + evicted = time_emulation(uc, addr, addr + len(code)) print(f">>> Run time: First time {standard}, Cached: {cached}, Cached evicted: {evicted}") @@ -93,7 +92,7 @@ def test_uc_ctl_exits(): uc.hook_add(UC_HOOK_EDGE_GENERATED, trace_new_edge) # Trace cmp instruction. - uc.hook_add(UC_HOOK_TCG_OPCODE, trace_tcg_sub, UC_TCG_OP_SUB, UC_TCG_OP_FLAG_CMP) + uc.hook_add(UC_HOOK_TCG_OPCODE, trace_tcg_sub, aux1=UC_TCG_OP_SUB, aux2=UC_TCG_OP_FLAG_CMP) uc.ctl_exits_enabled(True) diff --git a/bindings/python/unicorn/__init__.py b/bindings/python/unicorn/__init__.py index fe47b081..d3e4354f 100644 --- a/bindings/python/unicorn/__init__.py +++ b/bindings/python/unicorn/__init__.py @@ -1,4 +1,6 @@ -# Unicorn Python bindings, by Nguyen Anh Quynnh +# New and improved Unicorn Python bindings by elicn +# based on Nguyen Anh Quynnh's work + from . import arm_const, arm64_const, mips_const, sparc_const, m68k_const, x86_const, riscv_const, s390x_const, tricore_const from .unicorn_const import * -from .unicorn import Uc, uc_version, uc_arch_supported, version_bind, debug, UcError, __version__ +from .unicorn import Uc, ucsubclass, uc_version, uc_arch_supported, version_bind, debug, UcError, __version__ diff --git a/bindings/python/unicorn/arch/__init__.py b/bindings/python/unicorn/arch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bindings/python/unicorn/arch/arm.py b/bindings/python/unicorn/arch/arm.py new file mode 100644 index 00000000..0669072d --- /dev/null +++ b/bindings/python/unicorn/arch/arm.py @@ -0,0 +1,81 @@ +# AArch32 classes and structures. +# +# @author elicn + +from typing import Any, Tuple + +import ctypes + +from .. import Uc +from .. import arm_const as const + +from .types import UcTupledReg, UcReg128 + +ARMCPReg = Tuple[int, int, int, int, int, int, int, int] + + +class UcRegCP(UcTupledReg[ARMCPReg]): + """ARM coprocessors registers for instructions MRC, MCR, MRRC, MCRR + """ + + _fields_ = ( + ('cp', ctypes.c_uint32), + ('is64', ctypes.c_uint32), + ('sec', ctypes.c_uint32), + ('crn', ctypes.c_uint32), + ('crm', ctypes.c_uint32), + ('opc1', ctypes.c_uint32), + ('opc2', ctypes.c_uint32), + ('val', ctypes.c_uint64) + ) + + @property + def value(self) -> int: + return self.val + + +class UcAArch32(Uc): + """Unicorn subclass for ARM architecture. + """ + + REG_RANGE_Q = range(const.UC_ARM_REG_Q0, const.UC_ARM_REG_Q15 + 1) + + @staticmethod + def __select_reg_class(reg_id: int): + """Select class for special architectural registers. + """ + + reg_class = ( + (UcAArch32.REG_RANGE_Q, UcReg128), + ) + + return next((cls for rng, cls in reg_class if reg_id in rng), None) + + def reg_read(self, reg_id: int, aux: Any = None): + # select register class for special cases + reg_cls = UcAArch32.__select_reg_class(reg_id) + + if reg_cls is None: + if reg_id == const.UC_ARM_REG_CP_REG: + return self._reg_read(reg_id, UcRegCP, *aux) + + else: + # fallback to default reading method + return super().reg_read(reg_id, aux) + + return self._reg_read(reg_id, reg_cls) + + def reg_write(self, reg_id: int, value) -> None: + # select register class for special cases + reg_cls = UcAArch32.__select_reg_class(reg_id) + + if reg_cls is None: + if reg_id == const.UC_ARM_REG_CP_REG: + self._reg_write(reg_id, UcRegCP, value) + + else: + # fallback to default writing method + super().reg_write(reg_id, value) + + else: + self._reg_write(reg_id, reg_cls, value) diff --git a/bindings/python/unicorn/arch/arm64.py b/bindings/python/unicorn/arch/arm64.py new file mode 100644 index 00000000..475957a8 --- /dev/null +++ b/bindings/python/unicorn/arch/arm64.py @@ -0,0 +1,126 @@ +# AArch64 classes and structures. +# +# @author elicn + +from typing import Any, Callable, NamedTuple, Tuple + +import ctypes + +from .. import Uc, UcError +from .. import arm64_const as const + +from ..unicorn import uccallback +from ..unicorn_const import UC_ERR_ARG, UC_HOOK_INSN +from .types import uc_engine, UcTupledReg, UcReg128 + +ARM64CPReg = Tuple[int, int, int, int, int, int] + +HOOK_INSN_SYS_CFUNC = ctypes.CFUNCTYPE(ctypes.c_uint32, uc_engine, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p) + + +class UcRegCP(UcTupledReg[ARM64CPReg]): + """ARM64 coprocessors registers for instructions MRS, MSR + """ + + _fields_ = ( + ('crn', ctypes.c_uint32), + ('crm', ctypes.c_uint32), + ('op0', ctypes.c_uint32), + ('op1', ctypes.c_uint32), + ('op2', ctypes.c_uint32), + ('val', ctypes.c_uint64) + ) + + @property + def value(self) -> int: + return self.val + + +class UcAArch64(Uc): + """Unicorn subclass for ARM64 architecture. + """ + + REG_RANGE_Q = range(const.UC_ARM64_REG_Q0, const.UC_ARM64_REG_Q31 + 1) + REG_RANGE_V = range(const.UC_ARM64_REG_V0, const.UC_ARM64_REG_V31 + 1) + + def hook_add(self, htype: int, callback: Callable, user_data: Any = None, begin: int = 1, end: int = 0, aux1: int = 0, aux2: int = 0) -> int: + if htype != UC_HOOK_INSN: + return super().hook_add(htype, callback, user_data, begin, end, aux1, aux2) + + insn = ctypes.c_int(aux1) + + def __hook_insn_sys(): + @uccallback(HOOK_INSN_SYS_CFUNC) + def __hook_insn_sys_cb(handle: int, reg: int, pcp_reg: Any, key: int) -> int: + cp_reg = ctypes.cast(pcp_reg, ctypes.POINTER(UcRegCP)).contents + + class CpReg(NamedTuple): + crn: int + crm: int + op0: int + op1: int + op2: int + val: int + + cp_reg = CpReg(cp_reg.crn, cp_reg.crm, cp_reg.op0, cp_reg.op1, cp_reg.op2, cp_reg.val) + + return callback(self, reg, cp_reg, user_data) + + return __hook_insn_sys_cb + + handlers = { + const.UC_ARM64_INS_MRS : __hook_insn_sys, + const.UC_ARM64_INS_MSR : __hook_insn_sys, + const.UC_ARM64_INS_SYS : __hook_insn_sys, + const.UC_ARM64_INS_SYSL : __hook_insn_sys + } + + handler = handlers.get(insn.value) + + if handler is None: + raise UcError(UC_ERR_ARG) + + fptr = handler() + + return getattr(self, '_Uc__do_hook_add')(htype, fptr, begin, end, insn) + + @staticmethod + def __select_reg_class(reg_id: int): + """Select class for special architectural registers. + """ + + reg_class = ( + (UcAArch64.REG_RANGE_Q, UcReg128), + (UcAArch64.REG_RANGE_V, UcReg128) + ) + + return next((cls for rng, cls in reg_class if reg_id in rng), None) + + def reg_read(self, reg_id: int, aux: Any = None): + # select register class for special cases + reg_cls = UcAArch64.__select_reg_class(reg_id) + + if reg_cls is None: + if reg_id == const.UC_ARM64_REG_CP_REG: + return self._reg_read(reg_id, UcRegCP, *aux) + + else: + # fallback to default reading method + return super().reg_read(reg_id, aux) + + return self._reg_read(reg_id, reg_cls) + + def reg_write(self, reg_id: int, value) -> None: + # select register class for special cases + reg_cls = UcAArch64.__select_reg_class(reg_id) + + if reg_cls is None: + if reg_id == const.UC_ARM64_REG_CP_REG: + self._reg_write(reg_id, UcRegCP, value) + + else: + # fallback to default writing method + super().reg_write(reg_id, value) + + else: + self._reg_write(reg_id, reg_cls, value) diff --git a/bindings/python/unicorn/arch/intel.py b/bindings/python/unicorn/arch/intel.py new file mode 100644 index 00000000..010a881b --- /dev/null +++ b/bindings/python/unicorn/arch/intel.py @@ -0,0 +1,178 @@ +# Intel architecture classes and structures. +# +# @author elicn + +from typing import Any, Callable, Tuple + +import ctypes + +from .. import Uc, UcError +from .. import x86_const as const + +from ..unicorn import uccallback +from ..unicorn_const import UC_ERR_ARG, UC_HOOK_INSN +from .types import uc_engine, UcTupledReg, UcReg128, UcReg256, UcReg512 + +X86MMRReg = Tuple[int, int, int, int] +X86MSRReg = Tuple[int, int] +X86FPReg = Tuple[int, int] + +HOOK_INSN_IN_CFUNC = ctypes.CFUNCTYPE(ctypes.c_uint32, uc_engine, ctypes.c_uint32, ctypes.c_int, ctypes.c_void_p) +HOOK_INSN_OUT_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint32, ctypes.c_int, ctypes.c_uint32, ctypes.c_void_p) +HOOK_INSN_SYSCALL_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_void_p) +HOOK_INSN_CPUID_CFUNC = ctypes.CFUNCTYPE(ctypes.c_uint32, uc_engine, ctypes.c_void_p) + + +class UcRegMMR(UcTupledReg[X86MMRReg]): + """Memory-Management Register for instructions IDTR, GDTR, LDTR, TR. + """ + + _fields_ = ( + ('selector', ctypes.c_uint16), # not used by GDTR and IDTR + ('base', ctypes.c_uint64), # handle 32 or 64 bit CPUs + ('limit', ctypes.c_uint32), + ('flags', ctypes.c_uint32) # not used by GDTR and IDTR + ) + + +class UcRegMSR(UcTupledReg[X86MSRReg]): + _fields_ = ( + ('rid', ctypes.c_uint32), + ('val', ctypes.c_uint64) + ) + + @property + def value(self) -> int: + return self.val + + +class UcRegFPR(UcTupledReg[X86FPReg]): + _fields_ = ( + ('mantissa', ctypes.c_uint64), + ('exponent', ctypes.c_uint16) + ) + + +class UcIntel(Uc): + """Unicorn subclass for Intel architecture. + """ + + REG_RANGE_MMR = ( + const.UC_X86_REG_IDTR, + const.UC_X86_REG_GDTR, + const.UC_X86_REG_LDTR, + const.UC_X86_REG_TR + ) + + REG_RANGE_FP = range(const.UC_X86_REG_FP0, const.UC_X86_REG_FP7 + 1) + REG_RANGE_XMM = range(const.UC_X86_REG_XMM0, const.UC_X86_REG_XMM31 + 1) + REG_RANGE_YMM = range(const.UC_X86_REG_YMM0, const.UC_X86_REG_YMM31 + 1) + REG_RANGE_ZMM = range(const.UC_X86_REG_ZMM0, const.UC_X86_REG_ZMM31 + 1) + + def hook_add(self, htype: int, callback: Callable, user_data: Any = None, begin: int = 1, end: int = 0, aux1: int = 0, aux2: int = 0) -> int: + if htype != UC_HOOK_INSN: + return super().hook_add(htype, callback, user_data, begin, end, aux1, aux2) + + insn = ctypes.c_int(aux1) + + def __hook_insn_in(): + @uccallback(HOOK_INSN_IN_CFUNC) + def __hook_insn_in_cb(handle: int, port: int, size: int, key: int) -> int: + return callback(self, port, size, user_data) + + return __hook_insn_in_cb + + def __hook_insn_out(): + @uccallback(HOOK_INSN_OUT_CFUNC) + def __hook_insn_out_cb(handle: int, port: int, size: int, value: int, key: int): + callback(self, port, size, value, user_data) + + return __hook_insn_out_cb + + def __hook_insn_syscall(): + @uccallback(HOOK_INSN_SYSCALL_CFUNC) + def __hook_insn_syscall_cb(handle: int, key: int): + callback(self, user_data) + + return __hook_insn_syscall_cb + + def __hook_insn_cpuid(): + @uccallback(HOOK_INSN_CPUID_CFUNC) + def __hook_insn_cpuid_cb(handle: int, key: int) -> int: + return callback(self, user_data) + + return __hook_insn_cpuid_cb + + handlers = { + const.UC_X86_INS_IN : __hook_insn_in, + const.UC_X86_INS_OUT : __hook_insn_out, + const.UC_X86_INS_SYSCALL : __hook_insn_syscall, + const.UC_X86_INS_SYSENTER : __hook_insn_syscall, + const.UC_X86_INS_CPUID : __hook_insn_cpuid + } + + handler = handlers.get(insn.value) + + if handler is None: + raise UcError(UC_ERR_ARG) + + fptr = handler() + + return getattr(self, '_Uc__do_hook_add')(htype, fptr, begin, end, insn) + + @staticmethod + def __select_reg_class(reg_id: int): + """Select class for special architectural registers. + """ + + reg_class = ( + (UcIntel.REG_RANGE_MMR, UcRegMMR), + (UcIntel.REG_RANGE_FP, UcRegFPR), + (UcIntel.REG_RANGE_XMM, UcReg128), + (UcIntel.REG_RANGE_YMM, UcReg256), + (UcIntel.REG_RANGE_ZMM, UcReg512) + ) + + return next((cls for rng, cls in reg_class if reg_id in rng), None) + + def reg_read(self, reg_id: int, aux: Any = None): + # select register class for special cases + reg_cls = UcIntel.__select_reg_class(reg_id) + + if reg_cls is None: + # backward compatibility: msr read through reg_read + if reg_id == const.UC_X86_REG_MSR: + if type(aux) is not int: + raise UcError(UC_ERR_ARG) + + value = self.msr_read(aux) + + else: + value = super().reg_read(reg_id, aux) + else: + value = self._reg_read(reg_id, reg_cls) + + return value + + def reg_write(self, reg_id: int, value) -> None: + # select register class for special cases + reg_cls = UcIntel.__select_reg_class(reg_id) + + if reg_cls is None: + # backward compatibility: msr write through reg_write + if reg_id == const.UC_X86_REG_MSR: + if type(value) is not tuple or len(value) != 2: + raise UcError(UC_ERR_ARG) + + self.msr_write(*value) + return + + super().reg_write(reg_id, value) + else: + self._reg_write(reg_id, reg_cls, value) + + def msr_read(self, msr_id: int) -> int: + return self._reg_read(const.UC_X86_REG_MSR, UcRegMSR, msr_id) + + def msr_write(self, msr_id: int, value: int) -> None: + self._reg_write(const.UC_X86_REG_MSR, UcRegMSR, (msr_id, value)) diff --git a/bindings/python/unicorn/arch/types.py b/bindings/python/unicorn/arch/types.py new file mode 100644 index 00000000..bd2a2be2 --- /dev/null +++ b/bindings/python/unicorn/arch/types.py @@ -0,0 +1,92 @@ +# Common types and structures. +# +# @author elicn + +from abc import abstractmethod +from typing import Generic, Tuple, TypeVar + +import ctypes + +uc_err = ctypes.c_int +uc_engine = ctypes.c_void_p +uc_context = ctypes.c_void_p +uc_hook_h = ctypes.c_size_t + + +VT = TypeVar('VT', bound=Tuple[int, ...]) + + +class UcReg(ctypes.Structure): + """A base class for composite registers. + + This class is meant to be inherited, not instantiated directly. + """ + + @property + @abstractmethod + def value(self): + """Get register value. + """ + + pass + + @classmethod + @abstractmethod + def from_value(cls, value): + """Create a register instance from a given value. + """ + + pass + + +class UcTupledReg(UcReg, Generic[VT]): + """A base class for registers whose values are represented as a set + of fields. + + This class is meant to be inherited, not instantiated directly. + """ + + @property + def value(self) -> VT: + return tuple(getattr(self, fname) for fname, *_ in self.__class__._fields_) # type: ignore + + @classmethod + def from_value(cls, value: VT): + assert type(value) is tuple and len(value) == len(cls._fields_) + + return cls(*value) + + +class UcLargeReg(UcReg): + """A base class for large registers that are internally represented as + an array of multiple qwords. + + This class is meant to be inherited, not instantiated directly. + """ + + qwords: ctypes.Array + + @property + def value(self) -> int: + return sum(qword << (64 * i) for i, qword in enumerate(self.qwords)) + + @classmethod + def from_value(cls, value: int): + assert type(value) is int + + mask = (1 << 64) - 1 + size = cls._fields_[0][1]._length_ + + return cls(tuple((value >> (64 * i)) & mask for i in range(size))) + + +class UcReg128(UcLargeReg): + _fields_ = [('qwords', ctypes.c_uint64 * 2)] + + +class UcReg256(UcLargeReg): + _fields_ = [('qwords', ctypes.c_uint64 * 4)] + + +class UcReg512(UcLargeReg): + _fields_ = [('qwords', ctypes.c_uint64 * 8)] diff --git a/bindings/python/unicorn/unicorn.py b/bindings/python/unicorn/unicorn.py index 7313bd98..17093835 100644 --- a/bindings/python/unicorn/unicorn.py +++ b/bindings/python/unicorn/unicorn.py @@ -1,977 +1,1099 @@ -# Unicorn Python bindings, by Nguyen Anh Quynnh +# New and improved Unicorn Python bindings by elicn +# based on Nguyen Anh Quynnh's work + from __future__ import annotations +from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Mapping, MutableMapping, Optional, Sequence, Tuple, Type, TypeVar + import ctypes -import ctypes.util -import distutils.sysconfig -from functools import wraps -from typing import Any, Callable, List, Tuple, Union -import pkg_resources -import inspect -import os.path -import sys -import weakref import functools -from collections import namedtuple +import weakref -from . import x86_const, arm_const, arm64_const, unicorn_const as uc +from . import unicorn_const as uc +from .arch.types import * -if not hasattr(sys.modules[__name__], "__file__"): - __file__ = inspect.getfile(inspect.currentframe()) +__version__ = f'{uc.UC_VERSION_MAJOR}.{uc.UC_VERSION_MINOR}.{uc.UC_VERSION_PATCH}' -_python2 = sys.version_info[0] < 3 -if _python2: - range = xrange - -_lib = { 'darwin': 'libunicorn.2.dylib', - 'win32': 'unicorn.dll', - 'cygwin': 'cygunicorn.dll', - 'linux': 'libunicorn.so.2', - 'linux2': 'libunicorn.so.2' } - - -# Windows DLL in dependency order -_all_windows_dlls = ( - "libwinpthread-1.dll", - "libgcc_s_seh-1.dll", - "libgcc_s_dw2-1.dll", -) - -_loaded_windows_dlls = set() - -def _load_win_support(path): - for dll in _all_windows_dlls: - if dll in _loaded_windows_dlls: - continue - - lib_file = os.path.join(path, dll) - if ('/' not in path and '\\' not in path) or os.path.exists(lib_file): - try: - #print('Trying to load Windows library', lib_file) - ctypes.cdll.LoadLibrary(lib_file) - #print('SUCCESS') - _loaded_windows_dlls.add(dll) - except OSError as e: - #print('FAIL to load %s' %lib_file, e) - continue - -# Initial attempt: load all dlls globally -if sys.platform in ('win32', 'cygwin'): - _load_win_support('') - -def _load_lib(path, lib_name): - try: - if sys.platform in ('win32', 'cygwin'): - _load_win_support(path) - - lib_file = os.path.join(path, lib_name) - dll = ctypes.cdll.LoadLibrary(lib_file) - #print('SUCCESS') - return dll - except OSError as e: - #print('FAIL to load %s' %lib_file, e) - return None - -_uc = None - -# Loading attempts, in order -# - user-provided environment variable -# - pkg_resources can get us the path to the local libraries -# - we can get the path to the local libraries by parsing our filename -# - global load -# - python's lib directory -# - last-gasp attempt at some hardcoded paths on darwin and linux - -_path_list = [os.getenv('LIBUNICORN_PATH', None), - pkg_resources.resource_filename(__name__, 'lib'), - os.path.join(os.path.split(__file__)[0], 'lib'), - '', - distutils.sysconfig.get_python_lib(), - "/usr/local/lib/" if sys.platform == 'darwin' else '/usr/lib64', - os.getenv('PATH', '')] - -#print(_path_list) -#print("-" * 80) - -for _path in _path_list: - if _path is None: continue - _uc = _load_lib(_path, _lib.get(sys.platform, "libunicorn.so.2")) - if _uc is not None: - break - -# Try to search old unicorn1 library without SONAME -if _uc is None: - for _path in _path_list: - if _path is None: - continue - - _uc = _load_lib(_path, "libunicorn.so") - if _uc is not None: - # In this case, show a warning for users - print("Found an old style dynamic library libunicorn.so, consider checking your installation", file=sys.stderr) - break - -if _uc is None: - raise ImportError("ERROR: fail to load the dynamic library.") - -__version__ = "%u.%u.%u" % (uc.UC_VERSION_MAJOR, uc.UC_VERSION_MINOR, uc.UC_VERSION_PATCH) - -# setup all the function prototype -def _setup_prototype(lib, fname, restype, *argtypes): - try: - getattr(lib, fname).restype = restype - getattr(lib, fname).argtypes = argtypes - except AttributeError: - raise ImportError("ERROR: Fail to setup some function prototypes. Make sure you have cleaned your unicorn1 installation.") - -ucerr = ctypes.c_int -uc_mode = ctypes.c_int -uc_arch = ctypes.c_int -uc_engine = ctypes.c_void_p -uc_context = ctypes.c_void_p -uc_hook_h = ctypes.c_size_t - -def _structure_repr(self): - return "%s(%s)" % (self.__class__.__name__, ", ".join("%s=%s" % (k, getattr(self, k)) for (k, _) in self._fields_)) class _uc_mem_region(ctypes.Structure): - _fields_ = [ - ("begin", ctypes.c_uint64), - ("end", ctypes.c_uint64), - ("perms", ctypes.c_uint32), - ] + _fields_ = ( + ('begin', ctypes.c_uint64), + ('end', ctypes.c_uint64), + ('perms', ctypes.c_uint32), + ) + + @property + def value(self) -> Tuple[int, int, int]: + return tuple(getattr(self, fname) for fname, _ in self._fields_) - __repr__ = _structure_repr class uc_tb(ctypes.Structure): - """"TranslationBlock""" - _fields_ = [ - ("pc", ctypes.c_uint64), - ("icount", ctypes.c_uint16), - ("size", ctypes.c_uint16) - ] + """"TranslationBlock + """ - __repr__ = _structure_repr - -_setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)) -_setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int) -_setup_prototype(_uc, "uc_open", ucerr, ctypes.c_uint, ctypes.c_uint, ctypes.POINTER(uc_engine)) -_setup_prototype(_uc, "uc_close", ucerr, uc_engine) -_setup_prototype(_uc, "uc_strerror", ctypes.c_char_p, ucerr) -_setup_prototype(_uc, "uc_errno", ucerr, uc_engine) -_setup_prototype(_uc, "uc_reg_read", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p) -_setup_prototype(_uc, "uc_reg_write", ucerr, uc_engine, ctypes.c_int, ctypes.c_void_p) -_setup_prototype(_uc, "uc_mem_read", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t) -_setup_prototype(_uc, "uc_mem_write", ucerr, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t) -_setup_prototype(_uc, "uc_emu_start", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_size_t) -_setup_prototype(_uc, "uc_emu_stop", ucerr, uc_engine) -_setup_prototype(_uc, "uc_hook_del", ucerr, uc_engine, uc_hook_h) -_setup_prototype(_uc, "uc_mmio_map", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p) -_setup_prototype(_uc, "uc_mem_map", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32) -_setup_prototype(_uc, "uc_mem_map_ptr", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p) -_setup_prototype(_uc, "uc_mem_unmap", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t) -_setup_prototype(_uc, "uc_mem_protect", ucerr, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32) -_setup_prototype(_uc, "uc_query", ucerr, uc_engine, ctypes.c_uint32, ctypes.POINTER(ctypes.c_size_t)) -_setup_prototype(_uc, "uc_context_alloc", ucerr, uc_engine, ctypes.POINTER(uc_context)) -_setup_prototype(_uc, "uc_free", ucerr, ctypes.c_void_p) -_setup_prototype(_uc, "uc_context_save", ucerr, uc_engine, uc_context) -_setup_prototype(_uc, "uc_context_restore", ucerr, uc_engine, uc_context) -_setup_prototype(_uc, "uc_context_size", ctypes.c_size_t, uc_engine) -_setup_prototype(_uc, "uc_context_reg_read", ucerr, uc_context, ctypes.c_int, ctypes.c_void_p) -_setup_prototype(_uc, "uc_context_reg_write", ucerr, uc_context, ctypes.c_int, ctypes.c_void_p) -_setup_prototype(_uc, "uc_context_free", ucerr, uc_context) -_setup_prototype(_uc, "uc_mem_regions", ucerr, uc_engine, ctypes.POINTER(ctypes.POINTER(_uc_mem_region)), ctypes.POINTER(ctypes.c_uint32)) -# https://bugs.python.org/issue42880 -_setup_prototype(_uc, "uc_hook_add", ucerr, uc_engine, ctypes.POINTER(uc_hook_h), ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64) -_setup_prototype(_uc, "uc_ctl", ucerr, uc_engine, ctypes.c_int) - -UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p) -UC_HOOK_INSN_INVALID_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_void_p) -UC_HOOK_MEM_INVALID_CB = ctypes.CFUNCTYPE( - ctypes.c_bool, uc_engine, ctypes.c_int, - ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p -) -UC_HOOK_MEM_ACCESS_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.c_int, - ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p -) -UC_HOOK_INTR_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.c_uint32, ctypes.c_void_p -) -UC_HOOK_INSN_IN_CB = ctypes.CFUNCTYPE( - ctypes.c_uint32, uc_engine, ctypes.c_uint32, ctypes.c_int, ctypes.c_void_p -) -UC_HOOK_INSN_OUT_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.c_uint32, - ctypes.c_int, ctypes.c_uint32, ctypes.c_void_p -) -UC_HOOK_INSN_SYSCALL_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_void_p) -UC_HOOK_INSN_SYS_CB = ctypes.CFUNCTYPE(ctypes.c_uint32, uc_engine, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) -UC_HOOK_INSN_CPUID_CB = ctypes.CFUNCTYPE(ctypes.c_uint32, uc_engine, ctypes.c_void_p) -UC_MMIO_READ_CB = ctypes.CFUNCTYPE( - ctypes.c_uint64, uc_engine, ctypes.c_uint64, ctypes.c_int, ctypes.c_void_p -) -UC_MMIO_WRITE_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.c_uint64, ctypes.c_int, ctypes.c_uint64, ctypes.c_void_p -) -UC_HOOK_EDGE_GEN_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.POINTER(uc_tb), ctypes.POINTER(uc_tb), ctypes.c_void_p -) -UC_HOOK_TCG_OPCODE_CB = ctypes.CFUNCTYPE( - None, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p -) - -# access to error code via @errno of UcError -class UcError(Exception): - def __init__(self, errno): - self.errno = errno - - def __str__(self): - return _uc.uc_strerror(self.errno).decode('ascii') - - -# return the core's version -def uc_version(): - major = ctypes.c_int() - minor = ctypes.c_int() - combined = _uc.uc_version(ctypes.byref(major), ctypes.byref(minor)) - return (major.value, minor.value, combined) - - -# return the binding's version -def version_bind(): - return ( - uc.UC_API_MAJOR, uc.UC_API_MINOR, - (uc.UC_API_MAJOR << 8) + uc.UC_API_MINOR, + _fields_ = ( + ('pc', ctypes.c_uint64), + ('icount', ctypes.c_uint16), + ('size', ctypes.c_uint16) ) -# check to see if this engine supports a particular arch -def uc_arch_supported(query): - return _uc.uc_arch_supported(query) +def __load_uc_lib() -> ctypes.CDLL: + from pathlib import Path, PurePath -ARMCPReg = Tuple[int, int, int, int, int, int, int] -ARM64CPReg = Tuple[int, int, int, int, int] -ARMCPRegValue = Tuple[int, int, int, int, int, int, int, int] -ARM64CPRegValue = Tuple[int, int, int, int, int, int] -X86MMRReg = Tuple[int, int, int, int] -X86FPReg = Tuple[int, int] + import inspect + import os + import pkg_resources + import sys -# uc_reg_read/write and uc_context_reg_read/write. -def reg_read(reg_read_func, arch, reg_id, opt=None): - if arch == uc.UC_ARCH_X86: - if reg_id in [x86_const.UC_X86_REG_IDTR, x86_const.UC_X86_REG_GDTR, x86_const.UC_X86_REG_LDTR, x86_const.UC_X86_REG_TR]: - reg = uc_x86_mmr() - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.selector, reg.base, reg.limit, reg.flags - if reg_id in range(x86_const.UC_X86_REG_FP0, x86_const.UC_X86_REG_FP0+8): - reg = uc_x86_float80() - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.mantissa, reg.exponent - if reg_id in range(x86_const.UC_X86_REG_XMM0, x86_const.UC_X86_REG_XMM0+8): - reg = uc_x86_xmm() - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.low_qword | (reg.high_qword << 64) - if reg_id in range(x86_const.UC_X86_REG_YMM0, x86_const.UC_X86_REG_YMM0+16): - reg = uc_x86_ymm() - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.first_qword | (reg.second_qword << 64) | (reg.third_qword << 128) | (reg.fourth_qword << 192) - if reg_id is x86_const.UC_X86_REG_MSR: - if opt is None: - raise UcError(uc.UC_ERR_ARG) - reg = uc_x86_msr() - reg.rid = opt - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.value + loaded_dlls = set() - if arch == uc.UC_ARCH_ARM: - if reg_id == arm_const.UC_ARM_REG_CP_REG: - reg = uc_arm_cp_reg() - if not isinstance(opt, tuple) or len(opt) != 7: - raise UcError(uc.UC_ERR_ARG) - reg.cp, reg.is64, reg.sec, reg.crn, reg.crm, reg.opc1, reg.opc2 = opt - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.val + def __load_win_support(path: Path) -> None: + # Windows DLL in dependency order + all_dlls = ( + 'libwinpthread-1.dll', + 'libgcc_s_seh-1.dll', + 'libgcc_s_dw2-1.dll' + ) - if arch == uc.UC_ARCH_ARM64: - if reg_id == arm64_const.UC_ARM64_REG_CP_REG: - reg = uc_arm64_cp_reg() - if not isinstance(opt, tuple) or len(opt) != 5: - raise UcError(uc.UC_ERR_ARG) - reg.crn, reg.crm, reg.op0, reg.op1, reg.op2 = opt - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.val + for dllname in all_dlls: + if dllname not in loaded_dlls: + lib_file = path / dllname - elif reg_id in range(arm64_const.UC_ARM64_REG_Q0, arm64_const.UC_ARM64_REG_Q31+1) or reg_id in range(arm64_const.UC_ARM64_REG_V0, arm64_const.UC_ARM64_REG_V31+1): - reg = uc_arm64_neon128() - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.low_qword | (reg.high_qword << 64) + if str(path.parent) == '.' or lib_file.exists(): + try: + ctypes.cdll.LoadLibrary(str(lib_file)) + except OSError: + continue + else: + loaded_dlls.add(dllname) - # read to 64bit number to be safe - reg = ctypes.c_uint64(0) - status = reg_read_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return reg.value + platform = sys.platform -def reg_write(reg_write_func, arch, reg_id, value): - reg = None + # Initial attempt: load all dlls globally + if platform in ('win32', 'cygwin'): + __load_win_support(Path()) - if arch == uc.UC_ARCH_X86: - if reg_id in [x86_const.UC_X86_REG_IDTR, x86_const.UC_X86_REG_GDTR, x86_const.UC_X86_REG_LDTR, x86_const.UC_X86_REG_TR]: - assert isinstance(value, tuple) and len(value) == 4 - reg = uc_x86_mmr() - reg.selector = value[0] - reg.base = value[1] - reg.limit = value[2] - reg.flags = value[3] - if reg_id in range(x86_const.UC_X86_REG_FP0, x86_const.UC_X86_REG_FP0+8): - reg = uc_x86_float80() - reg.mantissa = value[0] - reg.exponent = value[1] - if reg_id in range(x86_const.UC_X86_REG_XMM0, x86_const.UC_X86_REG_XMM0+8): - reg = uc_x86_xmm() - reg.low_qword = value & 0xffffffffffffffff - reg.high_qword = value >> 64 - if reg_id in range(x86_const.UC_X86_REG_YMM0, x86_const.UC_X86_REG_YMM0+16): - reg = uc_x86_ymm() - reg.first_qword = value & 0xffffffffffffffff - reg.second_qword = (value >> 64) & 0xffffffffffffffff - reg.third_qword = (value >> 128) & 0xffffffffffffffff - reg.fourth_qword = value >> 192 - if reg_id is x86_const.UC_X86_REG_MSR: - reg = uc_x86_msr() - reg.rid = value[0] - reg.value = value[1] + def _load_lib(path: Path, lib_name: str): + if platform in ('win32', 'cygwin'): + __load_win_support(path) - if arch == uc.UC_ARCH_ARM64: - if reg_id in range(arm64_const.UC_ARM64_REG_Q0, arm64_const.UC_ARM64_REG_Q31+1) or reg_id in range(arm64_const.UC_ARM64_REG_V0, arm64_const.UC_ARM64_REG_V31+1): - reg = uc_arm64_neon128() - reg.low_qword = value & 0xffffffffffffffff - reg.high_qword = value >> 64 - elif reg_id == arm64_const.UC_ARM64_REG_CP_REG: - reg = uc_arm64_cp_reg() - if not isinstance(value, tuple) or len(value) != 6: - raise UcError(uc.UC_ERR_ARG) - reg.crn, reg.crm, reg.op0, reg.op1, reg.op2, reg.val = value + lib_file = path / lib_name - if arch == uc.UC_ARCH_ARM: - if reg_id == arm_const.UC_ARM_REG_CP_REG: - reg = uc_arm_cp_reg() - if not isinstance(value, tuple) or len(value) != 8: - raise UcError(uc.UC_ERR_ARG) - reg.cp, reg.is64, reg.sec, reg.crn, reg.crm, reg.opc1, reg.opc2, reg.val = value - - if reg is None: - # convert to 64bit number to be safe - reg = ctypes.c_uint64(value) - - status = reg_write_func(reg_id, ctypes.byref(reg)) - if status != uc.UC_ERR_OK: - raise UcError(status) - - return - -def _catch_hook_exception(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - """Catches exceptions raised in hook functions. - - If an exception is raised, it is saved to the Uc object and a call to stop - emulation is issued. - """ try: - return func(self, *args, **kwargs) - except Exception as e: - # If multiple hooks raise exceptions, just use the first one - if self._hook_exception is None: - self._hook_exception = e + return ctypes.cdll.LoadLibrary(str(lib_file)) + except OSError: + return None - self.emu_stop() + # Loading attempts, in order + # - user-provided environment variable + # - pkg_resources can get us the path to the local libraries + # - we can get the path to the local libraries by parsing our filename + # - global load + # - python's lib directory - return wrapper + lib_locations = [ + os.getenv('LIBUNICORN_PATH'), + pkg_resources.resource_filename(__name__, 'lib'), + PurePath(inspect.getfile(__load_uc_lib)).parent / 'lib', + '' + ] + [PurePath(p) / 'unicorn' / 'lib' for p in sys.path] + + # filter out None elements + lib_locations = tuple(Path(loc) for loc in lib_locations if loc is not None) + + lib_name = { + 'cygwin' : 'cygunicorn.dll', + 'darwin' : 'libunicorn.2.dylib', + 'linux' : 'libunicorn.so.2', + 'win32' : 'unicorn.dll' + }.get(platform, "libunicorn.so") + + def __attempt_load(libname: str): + T = TypeVar('T') + + def __pick_first_valid(iter: Iterable[T]) -> Optional[T]: + """Iterate till encountering a non-None element + """ + + return next((elem for elem in iter if elem is not None), None) + + return __pick_first_valid(_load_lib(loc, libname) for loc in lib_locations) + + lib = __attempt_load(lib_name) or __attempt_load('libunicorn.so') + + if lib is None: + raise ImportError('Failed to load the Unicorn dynamic library') + + return lib -class uc_arm_cp_reg(ctypes.Structure): - """ARM coprocessors registers for instructions MRC, MCR, MRRC, MCRR""" - _fields_ = [ - ("cp", ctypes.c_uint32), - ("is64", ctypes.c_uint32), - ("sec", ctypes.c_uint32), - ("crn", ctypes.c_uint32), - ("crm", ctypes.c_uint32), - ("opc1", ctypes.c_uint32), - ("opc2", ctypes.c_uint32), - ("val", ctypes.c_uint64) - ] +def __set_lib_prototypes(lib: ctypes.CDLL) -> None: + """Set up library functions prototypes. - __repr__ = _structure_repr + Args: + lib: unicorn library instance + """ -class uc_arm64_cp_reg(ctypes.Structure): - """ARM64 coprocessors registers for instructions MRS, MSR""" - _fields_ = [ - ("crn", ctypes.c_uint32), - ("crm", ctypes.c_uint32), - ("op0", ctypes.c_uint32), - ("op1", ctypes.c_uint32), - ("op2", ctypes.c_uint32), - ("val", ctypes.c_uint64) - ] + def __set_prototype(fname: str, restype: Type[ctypes._CData], *argtypes: Type[ctypes._CData]) -> None: + func: Optional[ctypes._FuncPointer] = getattr(lib, fname, None) - __repr__ = _structure_repr + if func is None: + raise ImportError('Failed to setup function prototypes; make sure you have cleaned your unicorn1 installation') -class uc_x86_mmr(ctypes.Structure): - """Memory-Management Register for instructions IDTR, GDTR, LDTR, TR.""" - _fields_ = [ - ("selector", ctypes.c_uint16), # not used by GDTR and IDTR - ("base", ctypes.c_uint64), # handle 32 or 64 bit CPUs - ("limit", ctypes.c_uint32), - ("flags", ctypes.c_uint32), # not used by GDTR and IDTR - ] + func.restype = restype + func.argtypes = argtypes - __repr__ = _structure_repr + __set_prototype('uc_version', ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)) + __set_prototype('uc_arch_supported', ctypes.c_bool, ctypes.c_int) + __set_prototype('uc_open', uc_err, ctypes.c_uint, ctypes.c_uint, ctypes.POINTER(uc_engine)) + __set_prototype('uc_close', uc_err, uc_engine) + __set_prototype('uc_strerror', ctypes.c_char_p, uc_err) + __set_prototype('uc_errno', uc_err, uc_engine) + __set_prototype('uc_reg_read', uc_err, uc_engine, ctypes.c_int, ctypes.c_void_p) + __set_prototype('uc_reg_write', uc_err, uc_engine, ctypes.c_int, ctypes.c_void_p) + __set_prototype('uc_mem_read', uc_err, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t) + __set_prototype('uc_mem_write', uc_err, uc_engine, ctypes.c_uint64, ctypes.POINTER(ctypes.c_char), ctypes.c_size_t) + __set_prototype('uc_emu_start', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_size_t) + __set_prototype('uc_emu_stop', uc_err, uc_engine) + __set_prototype('uc_hook_del', uc_err, uc_engine, uc_hook_h) + __set_prototype('uc_mmio_map', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p) + __set_prototype('uc_mem_map', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32) + __set_prototype('uc_mem_map_ptr', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p) + __set_prototype('uc_mem_unmap', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_size_t) + __set_prototype('uc_mem_protect', uc_err, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_uint32) + __set_prototype('uc_query', uc_err, uc_engine, ctypes.c_uint32, ctypes.POINTER(ctypes.c_size_t)) + __set_prototype('uc_context_alloc', uc_err, uc_engine, ctypes.POINTER(uc_context)) + __set_prototype('uc_free', uc_err, ctypes.c_void_p) + __set_prototype('uc_context_save', uc_err, uc_engine, uc_context) + __set_prototype('uc_context_restore', uc_err, uc_engine, uc_context) + __set_prototype('uc_context_size', ctypes.c_size_t, uc_engine) + __set_prototype('uc_context_reg_read', uc_err, uc_context, ctypes.c_int, ctypes.c_void_p) + __set_prototype('uc_context_reg_write', uc_err, uc_context, ctypes.c_int, ctypes.c_void_p) + __set_prototype('uc_context_free', uc_err, uc_context) + __set_prototype('uc_mem_regions', uc_err, uc_engine, ctypes.POINTER(ctypes.POINTER(_uc_mem_region)), ctypes.POINTER(ctypes.c_uint32)) + # https://bugs.python.org/issue42880 + __set_prototype('uc_hook_add', uc_err, uc_engine, ctypes.POINTER(uc_hook_h), ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64) + __set_prototype('uc_ctl', uc_err, uc_engine, ctypes.c_int) -class uc_x86_msr(ctypes.Structure): - _fields_ = [ - ("rid", ctypes.c_uint32), - ("value", ctypes.c_uint64), - ] - __repr__ = _structure_repr +uclib = __load_uc_lib() +__set_lib_prototypes(uclib) -class uc_x86_float80(ctypes.Structure): - """Float80""" - _fields_ = [ - ("mantissa", ctypes.c_uint64), - ("exponent", ctypes.c_uint16), - ] - __repr__ = _structure_repr +# native hook callback signatures +HOOK_INTR_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint32, ctypes.c_void_p) +HOOK_CODE_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p) +HOOK_MEM_INVALID_CFUNC = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_int, ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p) +HOOK_MEM_ACCESS_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_int, ctypes.c_uint64, ctypes.c_int, ctypes.c_int64, ctypes.c_void_p) +HOOK_INSN_INVALID_CFUNC = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_void_p) +HOOK_EDGE_GEN_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.POINTER(uc_tb), ctypes.POINTER(uc_tb), ctypes.c_void_p) +HOOK_TCG_OPCODE_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p) -class uc_x86_xmm(ctypes.Structure): - """128-bit xmm register""" - _fields_ = [ - ("low_qword", ctypes.c_uint64), - ("high_qword", ctypes.c_uint64), - ] +# mmio callback signatures +MMIO_READ_CFUNC = ctypes.CFUNCTYPE(ctypes.c_uint64, uc_engine, ctypes.c_uint64, ctypes.c_uint, ctypes.c_void_p) +MMIO_WRITE_CFUNC = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_uint, ctypes.c_uint64, ctypes.c_void_p) - __repr__ = _structure_repr -class uc_x86_ymm(ctypes.Structure): - """256-bit ymm register""" - _fields_ = [ - ("first_qword", ctypes.c_uint64), - ("second_qword", ctypes.c_uint64), - ("third_qword", ctypes.c_uint64), - ("fourth_qword", ctypes.c_uint64), - ] +class UcError(Exception): + """Unicorn base exception. - __repr__ = _structure_repr + Error context is specified through `errno` and `args`. + """ -class uc_arm64_neon128(ctypes.Structure): - """128-bit neon register""" - _fields_ = [ - ("low_qword", ctypes.c_uint64), - ("high_qword", ctypes.c_uint64), - ] + def __init__(self, errno: int, *args): + super().__init__(*args) - __repr__ = _structure_repr + self.errno = errno -# Subclassing ref to allow property assignment. -class UcRef(weakref.ref): - pass + def __str__(self) -> str: + return uclib.uc_strerror(self.errno).decode('ascii') -# This class tracks Uc instance destruction and releases handles. -class UcCleanupManager(object): - def __init__(self): - self._refs = {} - def register(self, uc): - ref = UcRef(uc, self._finalizer) - ref._uch = uc._uch - ref._class = uc.__class__ - self._refs[id(ref)] = ref +def uc_version() -> Tuple[int, int, int]: + """Retrieve Unicorn library version. - def _finalizer(self, ref): - # note: this method must be completely self-contained and cannot have any references - # to anything else in this module. - # - # This is because it may be called late in the Python interpreter's shutdown phase, at - # which point the module's variables may already have been deinitialized and set to None. - # - # Not respecting that can lead to errors such as: - # Exception AttributeError: - # "'NoneType' object has no attribute 'release_handle'" - # in > ignored - # - # For that reason, we do not try to access the `Uc` class directly here but instead use - # the saved `._class` reference. - del self._refs[id(ref)] - ref._class.release_handle(ref._uch) + Returns: a tuple containing major, minor and a combined verion number + """ -class Uc(object): - _cleanup = UcCleanupManager() + major = ctypes.c_int() + minor = ctypes.c_int() - def __init__(self, arch: int, mode: int): + combined = uclib.uc_version( + ctypes.byref(major), + ctypes.byref(minor) + ) + + return (major.value, minor.value, combined) + + +def version_bind() -> Tuple[int, int, int]: + """Retrieve Unicorn bindings version. + + Returns: a tuple containing major, minor and a combined verion number + """ + + major = uc.UC_API_MAJOR + minor = uc.UC_API_MINOR + + combined = (major << 8) + minor + + return (major, minor, combined) + + +def uc_arch_supported(atype: int) -> bool: + """Check whether Unicorn library supports a particular arch. + """ + + return bool(uclib.uc_arch_supported(atype)) + + +def debug() -> str: + """Get verbose verion string. + """ + + archs = ( + ('arm', uc.UC_ARCH_ARM), + ('arm64', uc.UC_ARCH_ARM64), + ('mips', uc.UC_ARCH_MIPS), + ('x86', uc.UC_ARCH_X86), + ('ppc', uc.UC_ARCH_PPC), + ('sparc', uc.UC_ARCH_SPARC), + ('m68k', uc.UC_ARCH_M68K), + ('riscv', uc.UC_ARCH_RISCV), + ('s390x', uc.UC_ARCH_S390X), + ('tricore', uc.UC_ARCH_TRICORE) + ) + + all_archs = ''.join(f'-{name}' for name, atype in archs if uc_arch_supported(atype)) + lib_maj, lib_min, _ = uc_version() + bnd_maj, bnd_min, _ = version_bind() + + return f'python-{all_archs}-c{lib_maj}.{lib_min}-b{bnd_maj}.{bnd_min}' + + +if TYPE_CHECKING: + # _FuncPointer is not recognized at runtime; use it only for type annotation + _CFP = TypeVar('_CFP', bound=ctypes._FuncPointer) + + +def uccallback(functype: Type[_CFP]): + """Unicorn callback decorator. + + Wraps a Python function meant to be dispatched by Unicorn as a hook callback. + The function call is wrapped with an exception guard to catch and record + exceptions thrown during hook handling. + + If an exception occurs, it is saved to the Uc object and emulation is stopped. + """ + + def decorate(func) -> _CFP: + + @functools.wraps(func) + def wrapper(uc: Uc, *args, **kwargs): + try: + return func(uc, *args, **kwargs) + except Exception as e: + # If multiple hooks raise exceptions, just use the first one + if uc._hook_exception is None: + uc._hook_exception = e + + uc.emu_stop() + + return ctypes.cast(functype(wrapper), functype) + + return decorate + + +class RegStateManager: + """Registers state manager. + + Designed as a mixin class; not to be instantiated directly. + Some methods must be implemented by mixin instances + """ + + _DEFAULT_REGTYPE = ctypes.c_uint64 + + def _do_reg_read(self, reg_id: int, reg_obj) -> int: + """Private register read implementation. + Must be implemented by the mixin object + """ + + raise NotImplementedError + + def _do_reg_write(self, reg_id: int, reg_obj) -> int: + """Private register write implementation. + Must be implemented by the mixin object + """ + + raise NotImplementedError + + def _reg_read(self, reg_id: int, regtype, *args): + """Register read helper method. + """ + + reg = regtype(*args) + status = self._do_reg_read(reg_id, ctypes.byref(reg)) + + if status != uc.UC_ERR_OK: + raise UcError(status, reg_id) + + return reg.value + + def _reg_write(self, reg_id: int, regtype: Type, value) -> None: + """Register write helper method. + """ + + reg = regtype.from_value(value) if issubclass(regtype, UcReg) else regtype(value) + status = self._do_reg_write(reg_id, ctypes.byref(reg)) + + if status != uc.UC_ERR_OK: + raise UcError(status, reg_id) + + def reg_read(self, reg_id: int, aux: Any = None): + """Read architectural register value. + + Args: + reg_id : register identifier (architecture-specific enumeration) + aux : auxiliary data (register specific) + + Returns: register value (register-specific format) + + Raises: `UcError` in case of invalid register id or auxiliary data + """ + + return self._reg_read(reg_id, self._DEFAULT_REGTYPE) + + def reg_write(self, reg_id: int, value) -> None: + """Write to architectural register. + + Args: + reg_id : register identifier (architecture-specific enumeration) + value : value to write (register-specific format) + + Raises: `UcError` in case of invalid register id or value format + """ + + self._reg_write(reg_id, self._DEFAULT_REGTYPE, value) + + +def ucsubclass(cls): + """Uc subclass decorator. + + Use it to decorate user-defined Uc subclasses. + + Example: + >>> @ucsubclass + ... class Pegasus(Uc): + ... '''A Unicorn impl with wings + ... ''' + ... pass + """ + + # to maintain proper inheritance for user-defined Uc subclasses, the Uc + # base class is replaced with the appropriate Uc pre-defined subclass on + # first instantiation. + # + # for example, if the Pegasus class from the example above instantiates + # with Intel arch and 64-bit mode, the Pegasus class will be modified to + # inherit from UcIntel and only then Uc, instead of Uc directly. that is: + # Pegasus -> UcIntel -> Uc -> RegStateManager -> object + # + # note that all Pegasus subclasses will have the same inheritence chain, + # regardless of the arch and mode the might use to initialize. + + def __replace(seq: Tuple, item, repl) -> Tuple: + if item not in seq: + return seq + + i = seq.index(item) + + return seq[:i] + tuple([repl]) + seq[i + 1:] + + def __new_uc_subclass(cls, arch: int, mode: int): + # resolve the appropriate Uc subclass + subcls = Uc.__new__(cls, arch, mode) + + # set the resolved subclass as base class instead of Uc (if there) + cls.__bases__ = __replace(cls.__bases__, Uc, type(subcls)) + + return object.__new__(cls) + + setattr(cls, '__new__', __new_uc_subclass) + + return cls + + +class Uc(RegStateManager): + """Unicorn Engine class. + """ + + @staticmethod + def __is_compliant() -> bool: + """Checks whether Unicorn binding version complies with Unicorn library. + + Returns: `True` if versions match, `False` otherwise + """ + + uc_maj, uc_min, _ = uc_version() + bnd_maj, bnd_min, _ = version_bind() + + return (uc_maj, uc_min) == (bnd_maj, bnd_min) + + def __new__(cls, arch: int, mode: int, cpu: Optional[int] = None): # verify version compatibility with the core before doing anything - (major, minor, _combined) = uc_version() - # print("core version =", uc_version()) - # print("binding version =", uc.UC_API_MAJOR, uc.UC_API_MINOR) - if major != uc.UC_API_MAJOR or minor != uc.UC_API_MINOR: - self._uch = None - # our binding version is different from the core's API version + if not Uc.__is_compliant(): raise UcError(uc.UC_ERR_VERSION) - self._arch, self._mode = arch, mode - self._uch = ctypes.c_void_p() - status = _uc.uc_open(arch, mode, ctypes.byref(self._uch)) + import importlib + + def __uc_subclass(pkgname: str, clsname: str): + """Use a lazy subclass instantiation to avoid importing unnecessary arch + classes. + """ + + def __wrapped() -> Type[Uc]: + archmod = importlib.import_module(f'.arch.{pkgname}', f'unicorn') + + return getattr(archmod, clsname) + + return __wrapped + + def __uc_generic(): + return Uc + + wrapped: Callable[[], Type[Uc]] = { + uc.UC_ARCH_ARM : __uc_subclass('arm', 'UcAArch32'), + uc.UC_ARCH_ARM64 : __uc_subclass('arm64', 'UcAArch64'), + uc.UC_ARCH_MIPS : __uc_generic, + uc.UC_ARCH_X86 : __uc_subclass('intel', 'UcIntel'), + uc.UC_ARCH_PPC : __uc_generic, + uc.UC_ARCH_SPARC : __uc_generic, + uc.UC_ARCH_M68K : __uc_generic, + uc.UC_ARCH_RISCV : __uc_generic, + uc.UC_ARCH_S390X : __uc_generic, + uc.UC_ARCH_TRICORE : __uc_generic + }[arch] + + subclass = wrapped() + + # return the appropriate unicorn subclass type + return super(Uc, cls).__new__(subclass) + + def __init__(self, arch: int, mode: int, cpu: Optional[int] = None) -> None: + """Initialize a Unicorn engine instance. + + Args: + arch: emulated architecture identifier (see UC_ARCH_* constants) + mode: emulated processor mode (see UC_MODE_* constants) + cpu: emulated cpu model (see UC_CPU_* constants) [optional] + """ + + self._arch = arch + self._mode = mode + + # initialize the unicorn instance + self._uch = uc_engine() + status = uclib.uc_open(arch, mode, ctypes.byref(self._uch)) + if status != uc.UC_ERR_OK: self._uch = None raise UcError(status) - # internal mapping table to save callback & userdata - self._callbacks = {} - self._ctype_cbs = [] - self._callback_count = 0 - self._cleanup.register(self) - self._hook_exception = None # The exception raised in a hook + + if cpu is not None: + self.ctl_set_cpu_model(cpu) + + # we have to keep a reference to the callbacks so they do not get gc-ed + # see: https://docs.python.org/3/library/ctypes.html#callback-functions + self._callbacks: MutableMapping[int, ctypes._FuncPointer] = {} + self._mmio_callbacks: MutableMapping[Tuple[int, int], Tuple[Optional[MMIO_READ_CFUNC], Optional[MMIO_WRITE_CFUNC]]] = {} + + self._hook_exception: Optional[Exception] = None + + # create a finalizer object that will apropriately free up resources when + # this instance undergoes garbage collection. + self.__finalizer = weakref.finalize(self, Uc.release_handle, self._uch) @staticmethod - def release_handle(uch: ctypes.CDLL): + def release_handle(uch: uc_engine) -> None: + # this method and its arguments must not have any reference to the Uc instance being + # destroyed. namely, this method cannot be a bound method. + if uch: try: - status = _uc.uc_close(uch) - if status != uc.UC_ERR_OK: - raise UcError(status) - except: # _uc might be pulled from under our feet + status = uclib.uc_close(uch) + + # _uc might be pulled from under our feet + except: pass - # emulate from @begin, and stop when reaching address @until - def emu_start(self, begin: int, until: int, timeout: int=0, count: int=0) -> None: + else: + if status != uc.UC_ERR_OK: + raise UcError(status) + + ########################### + # Emulation controllers # + ########################### + + def emu_start(self, begin: int, until: int, timeout: int = 0, count: int = 0) -> None: + """Start emulation from a specified address to another. + + Args: + begin : emulation starting address + until : emulation ending address + timeout : limit emulation to a certain amount of time (milliseconds) + count : limit emulation to a certain amount of instructions + + Raises: + `UcError` : in case emulation could not be started properly + `Exception` : in case an error has been encountered during emulation + """ + self._hook_exception = None - status = _uc.uc_emu_start(self._uch, begin, until, timeout, count) + status = uclib.uc_emu_start(self._uch, begin, until, timeout, count) + if status != uc.UC_ERR_OK: raise UcError(status) if self._hook_exception is not None: raise self._hook_exception - # stop emulation def emu_stop(self) -> None: - status = _uc.uc_emu_stop(self._uch) - if status != uc.UC_ERR_OK: - raise UcError(status) + """Stop emulation. - # return the value of a register, for @opt parameter, specify int for x86 msr, tuple for arm cp/neon regs. - def reg_read(self, reg_id: int, opt: Union[None, int, ARMCPReg, ARM64CPReg]=None) -> Union[int, X86MMRReg, X86FPReg]: - return reg_read(functools.partial(_uc.uc_reg_read, self._uch), self._arch, reg_id, opt) + Raises: `UcError` in case emulation could not be stopped properly + """ - # write to a register, tuple for arm cp regs. - def reg_write(self, reg_id: int, value: Union[int, ARMCPRegValue, ARM64CPRegValue, X86MMRReg, X86FPReg]): - return reg_write(functools.partial(_uc.uc_reg_write, self._uch), self._arch, reg_id, value) - - # read from MSR - X86 only - def msr_read(self, msr_id: int): - return self.reg_read(x86_const.UC_X86_REG_MSR, msr_id) - - # write to MSR - X86 only - def msr_write(self, msr_id, value: int): - return self.reg_write(x86_const.UC_X86_REG_MSR, (msr_id, value)) - - # read data from memory - def mem_read(self, address: int, size: int): - data = ctypes.create_string_buffer(size) - status = _uc.uc_mem_read(self._uch, address, data, size) - if status != uc.UC_ERR_OK: - raise UcError(status) - return bytearray(data) - - # write to memory - def mem_write(self, address: int, data: bytes): - status = _uc.uc_mem_write(self._uch, address, data, len(data)) - if status != uc.UC_ERR_OK: - raise UcError(status) - - def _mmio_map_read_cb(self, handle, offset, size, user_data): - (cb, data) = self._callbacks[user_data] - return cb(self, offset, size, data) - - def _mmio_map_write_cb(self, handle, offset, size, value, user_data): - (cb, data) = self._callbacks[user_data] - cb(self, offset, size, value, data) - - def mmio_map(self, address: int, size: int, - read_cb: UC_MMIO_READ_TYPE, user_data_read: Any, - write_cb: UC_MMIO_WRITE_TYPE, user_data_write: Any): - internal_read_cb = ctypes.cast(UC_MMIO_READ_CB(self._mmio_map_read_cb), UC_MMIO_READ_CB) - internal_write_cb = ctypes.cast(UC_MMIO_WRITE_CB(self._mmio_map_write_cb), UC_MMIO_WRITE_CB) - - self._callback_count += 1 - self._callbacks[self._callback_count] = (read_cb, user_data_read) - read_count = self._callback_count - self._callback_count += 1 - self._callbacks[self._callback_count] = (write_cb, user_data_write) - write_count = self._callback_count - - status = _uc.uc_mmio_map(self._uch, address, size, internal_read_cb, read_count, internal_write_cb, write_count) - if status != uc.UC_ERR_OK: - raise UcError(status) - - # https://docs.python.org/3/library/ctypes.html#callback-functions - self._ctype_cbs.append(internal_read_cb) - self._ctype_cbs.append(internal_write_cb) - - # map a range of memory - def mem_map(self, address: int, size: int, perms: int=uc.UC_PROT_ALL): - status = _uc.uc_mem_map(self._uch, address, size, perms) - if status != uc.UC_ERR_OK: - raise UcError(status) - - # map a range of memory from a raw host memory address - def mem_map_ptr(self, address: int, size: int, perms: int, ptr: int): - status = _uc.uc_mem_map_ptr(self._uch, address, size, perms, ptr) - if status != uc.UC_ERR_OK: - raise UcError(status) - - # unmap a range of memory - def mem_unmap(self, address: int, size: int): - status = _uc.uc_mem_unmap(self._uch, address, size) - if status != uc.UC_ERR_OK: - raise UcError(status) - - # protect a range of memory - def mem_protect(self, address: int, size: int, perms: int=uc.UC_PROT_ALL): - status = _uc.uc_mem_protect(self._uch, address, size, perms) - if status != uc.UC_ERR_OK: - raise UcError(status) - - # return CPU mode at runtime - def query(self, query_mode: int): - result = ctypes.c_size_t(0) - status = _uc.uc_query(self._uch, query_mode, ctypes.byref(result)) - if status != uc.UC_ERR_OK: - raise UcError(status) - return result.value - - @_catch_hook_exception - def _hook_tcg_op_cb(self, handle, address, arg1, arg2, size, user_data): - (cb, data) = self._callbacks[user_data] - cb(self, address, arg1, arg2, size, user_data) - - @_catch_hook_exception - def _hook_edge_gen_cb(self, handle, cur, prev, user_data): - (cb, data) = self._callbacks[user_data] - cb(self, cur.contents, prev.contents, user_data) - - @_catch_hook_exception - def _hookcode_cb(self, handle, address, size, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - cb(self, address, size, data) - - @_catch_hook_exception - def _hook_mem_invalid_cb(self, handle, access, address, size, value, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - return cb(self, access, address, size, value, data) - - @_catch_hook_exception - def _hook_mem_access_cb(self, handle, access, address, size, value, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - cb(self, access, address, size, value, data) - - @_catch_hook_exception - def _hook_intr_cb(self, handle, intno, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - cb(self, intno, data) - - @_catch_hook_exception - def _hook_insn_invalid_cb(self, handle, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - return cb(self, data) - - @_catch_hook_exception - def _hook_insn_in_cb(self, handle, port, size, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - return cb(self, port, size, data) - - @_catch_hook_exception - def _hook_insn_sys_cb(self, handle, reg, pcp_reg, user_data): - cp_reg = ctypes.cast(pcp_reg, ctypes.POINTER(uc_arm64_cp_reg)).contents - - (cb, data) = self._callbacks[user_data] - - return cb(self, reg, cp_reg, data) - - @_catch_hook_exception - def _hook_insn_out_cb(self, handle, port, size, value, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - cb(self, port, size, value, data) - - @_catch_hook_exception - def _hook_insn_syscall_cb(self, handle, user_data): - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - cb(self, data) - - @_catch_hook_exception - def _hook_insn_cpuid_cb(self, handle: int, user_data: int) -> int: - # call user's callback with self object - (cb, data) = self._callbacks[user_data] - return cb(self, data) - - def ctl(self, control: int, *args): - status = _uc.uc_ctl(self._uch, control, *args) - if status != uc.UC_ERR_OK: - raise UcError(status) - return status - - def __ctl(self, ctl, nr, rw): - return ctl | (nr << 26) | (rw << 30) - - def __ctl_r(self, ctl, nr): - return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ) - - def __ctl_w(self, ctl, nr): - return self.__ctl(ctl, nr, uc.UC_CTL_IO_WRITE) - - def __ctl_rw(self, ctl, nr): - return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ_WRITE) - - def __ctl_r_1_arg(self, ctl, ctp): - arg = ctp() - self.ctl(self.__ctl_r(ctl, 1), ctypes.byref(arg)) - return arg.value - - def __ctl_w_1_arg(self, ctl, val, ctp): - arg = ctp(val) - self.ctl(self.__ctl_w(ctl, 1), arg) - - def __ctl_w_2_arg(self, ctl, val1, val2, ctp1, ctp2): - arg1 = ctp1(val1) - arg2 = ctp2(val2) - self.ctl(self.__ctl_w(ctl, 2), arg1, arg2) - - def __ctl_rw_1_1_arg(self, ctl, val, ctp1, ctp2): - arg1 = ctp1(val) - arg2 = ctp2() - self.ctl(self.__ctl_rw(ctl, 2), arg1, ctypes.byref(arg2)) - return arg2 - - def ctl_get_mode(self): - return self.__ctl_r_1_arg(uc.UC_CTL_UC_MODE, ctypes.c_int) - - def ctl_get_page_size(self): - return self.__ctl_r_1_arg(uc.UC_CTL_UC_PAGE_SIZE, ctypes.c_uint32) - - def ctl_set_page_size(self, val: int): - self.__ctl_w_1_arg(uc.UC_CTL_UC_PAGE_SIZE, val, ctypes.c_uint32) - - def ctl_get_arch(self): - return self.__ctl_r_1_arg(uc.UC_CTL_UC_ARCH, ctypes.c_int) - - def ctl_get_timeout(self): - return self.__ctl_r_1_arg(uc.UC_CTL_UC_TIMEOUT, ctypes.c_uint64) - - def ctl_exits_enabled(self, val: bool): - self.__ctl_w_1_arg(uc.UC_CTL_UC_USE_EXITS, val, ctypes.c_int) - - def ctl_get_exits_cnt(self): - return self.__ctl_r_1_arg(uc.UC_CTL_UC_EXITS_CNT, ctypes.c_size_t) - - def ctl_get_exits(self): - l = self.ctl_get_exits_cnt() - arr = (ctypes.c_uint64 * l)() - self.ctl(self.__ctl_r(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(l)) - return [i for i in arr] - - def ctl_set_exits(self, exits: List[int]): - arr = (ctypes.c_uint64 * len(exits))() - for idx, exit in enumerate(exits): - arr[idx] = exit - self.ctl(self.__ctl_w(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(len(exits))) - - def ctl_get_cpu_model(self): - return self.__ctl_r_1_arg(uc.UC_CTL_CPU_MODEL, ctypes.c_int) - - def ctl_set_cpu_model(self, val: int): - self.__ctl_w_1_arg(uc.UC_CTL_CPU_MODEL, val, ctypes.c_int) - - def ctl_remove_cache(self, addr: int, end: int): - self.__ctl_w_2_arg(uc.UC_CTL_TB_REMOVE_CACHE, addr, end, ctypes.c_uint64, ctypes.c_uint64) - - def ctl_request_cache(self, addr: int): - return self.__ctl_rw_1_1_arg(uc.UC_CTL_TB_REQUEST_CACHE, addr, ctypes.c_uint64, uc_tb) - - def ctl_flush_tb(self): - self.ctl(self.__ctl_w(uc.UC_CTL_TB_FLUSH, 0)) - - def ctl_tlb_mode(self, mode: int): - self.__ctl_w_1_arg(uc.UC_CTL_TLB_TYPE, mode, ctypes.c_int) - - # add a hook - def hook_add(self, htype: int, callback: UC_HOOK_CALLBACK_TYPE , user_data: Any=None, begin: int=1, end: int=0, arg1: int=0, arg2: int=0): - _h2 = uc_hook_h() - - # save callback & user_data - self._callback_count += 1 - self._callbacks[self._callback_count] = (callback, user_data) - cb = None - - if htype == uc.UC_HOOK_INSN: - insn = ctypes.c_int(arg1) - if arg1 == x86_const.UC_X86_INS_IN: # IN instruction - cb = ctypes.cast(UC_HOOK_INSN_IN_CB(self._hook_insn_in_cb), UC_HOOK_INSN_IN_CB) - if arg1 == x86_const.UC_X86_INS_OUT: # OUT instruction - cb = ctypes.cast(UC_HOOK_INSN_OUT_CB(self._hook_insn_out_cb), UC_HOOK_INSN_OUT_CB) - if arg1 in (x86_const.UC_X86_INS_SYSCALL, x86_const.UC_X86_INS_SYSENTER): # SYSCALL/SYSENTER instruction - cb = ctypes.cast(UC_HOOK_INSN_SYSCALL_CB(self._hook_insn_syscall_cb), UC_HOOK_INSN_SYSCALL_CB) - if arg1 == x86_const.UC_X86_INS_CPUID: # CPUID instruction - cb = ctypes.cast(UC_HOOK_INSN_CPUID_CB(self._hook_insn_cpuid_cb), UC_HOOK_INSN_CPUID_CB) - if arg1 in (arm64_const.UC_ARM64_INS_MRS, arm64_const.UC_ARM64_INS_MSR, arm64_const.UC_ARM64_INS_SYS, arm64_const.UC_ARM64_INS_SYSL): - cb = ctypes.cast(UC_HOOK_INSN_SYS_CB(self._hook_insn_sys_cb), UC_HOOK_INSN_SYS_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end), insn - ) - elif htype == uc.UC_HOOK_TCG_OPCODE: - opcode = ctypes.c_int(arg1) - flags = ctypes.c_int(arg2) - - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, ctypes.cast(UC_HOOK_TCG_OPCODE_CB(self._hook_tcg_op_cb), UC_HOOK_TCG_OPCODE_CB), - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end), opcode, flags - ) - elif htype == uc.UC_HOOK_INTR: - cb = ctypes.cast(UC_HOOK_INTR_CB(self._hook_intr_cb), UC_HOOK_INTR_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - elif htype == uc.UC_HOOK_INSN_INVALID: - cb = ctypes.cast(UC_HOOK_INSN_INVALID_CB(self._hook_insn_invalid_cb), UC_HOOK_INSN_INVALID_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - elif htype == uc.UC_HOOK_EDGE_GENERATED: - cb = ctypes.cast(UC_HOOK_EDGE_GEN_CB(self._hook_edge_gen_cb), UC_HOOK_EDGE_GEN_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - else: - if htype in (uc.UC_HOOK_BLOCK, uc.UC_HOOK_CODE): - # set callback with wrapper, so it can be called - # with this object as param - cb = ctypes.cast(UC_HOOK_CODE_CB(self._hookcode_cb), UC_HOOK_CODE_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - elif htype & (uc.UC_HOOK_MEM_READ_UNMAPPED | - uc.UC_HOOK_MEM_WRITE_UNMAPPED | - uc.UC_HOOK_MEM_FETCH_UNMAPPED | - uc.UC_HOOK_MEM_READ_PROT | - uc.UC_HOOK_MEM_WRITE_PROT | - uc.UC_HOOK_MEM_FETCH_PROT): - cb = ctypes.cast(UC_HOOK_MEM_INVALID_CB(self._hook_mem_invalid_cb), UC_HOOK_MEM_INVALID_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - else: - cb = ctypes.cast(UC_HOOK_MEM_ACCESS_CB(self._hook_mem_access_cb), UC_HOOK_MEM_ACCESS_CB) - status = _uc.uc_hook_add( - self._uch, ctypes.byref(_h2), htype, cb, - ctypes.cast(self._callback_count, ctypes.c_void_p), - ctypes.c_uint64(begin), ctypes.c_uint64(end) - ) - - # save the ctype function so gc will leave it alone. - self._ctype_cbs.append(cb) + status = uclib.uc_emu_stop(self._uch) if status != uc.UC_ERR_OK: raise UcError(status) - return _h2.value + ########################### + # CPU state accessors # + ########################### - # delete a hook - def hook_del(self, h: int): - _h = uc_hook_h(h) - status = _uc.uc_hook_del(self._uch, _h) - if status != uc.UC_ERR_OK: - raise UcError(status) - h = 0 + def _do_reg_read(self, reg_id: int, reg_obj) -> int: + """Private register read implementation. + Do not call directly. + """ + + return uclib.uc_reg_read(self._uch, reg_id, reg_obj) + + def _do_reg_write(self, reg_id: int, reg_obj) -> int: + """Private register write implementation. + Do not call directly. + """ + + return uclib.uc_reg_write(self._uch, reg_id, reg_obj) + + ########################### + # Memory management # + ########################### + + def mem_map(self, address: int, size: int, perms: int = uc.UC_PROT_ALL) -> None: + """Map a memory range. + + Args: + address : range base address + size : range size (in bytes) + perms : access protection bitmask + + Raises: `UcError` in case memory could not be mapped + """ + + assert (perms & ~uc.UC_PROT_ALL) == 0, 'unexpected perms bitmask' + + status = uclib.uc_mem_map(self._uch, address, size, perms) - def context_save(self): - context = UcContext(self._uch, self._arch, self._mode) - status = _uc.uc_context_save(self._uch, context.context) if status != uc.UC_ERR_OK: raise UcError(status) - return context + def mem_map_ptr(self, address: int, size: int, perms: int, ptr: int) -> None: + """Map a memory range and point to existing data on host memory. + + Args: + address : range base address + size : range size (in bytes) + perms : access protection bitmask + ptr : address of data on host memory + + Raises: `UcError` in case memory could not be mapped + """ + + assert (perms & ~uc.UC_PROT_ALL) == 0, 'unexpected perms bitmask' + + status = uclib.uc_mem_map_ptr(self._uch, address, size, perms, ptr) - def context_update(self, context: UcContext): - status = _uc.uc_context_save(self._uch, context.context) if status != uc.UC_ERR_OK: raise UcError(status) - def context_restore(self, context: UcContext): - status = _uc.uc_context_restore(self._uch, context.context) + def mem_unmap(self, address: int, size: int) -> None: + """Reclaim a mapped memory range. + + Args: + address : range base address + size : range size (in bytes) + + Raises: `UcError` in case memory could not be unmapped + """ + + status = uclib.uc_mem_unmap(self._uch, address, size) + if status != uc.UC_ERR_OK: raise UcError(status) - # this returns a generator of regions in the form (begin, end, perms) - def mem_regions(self): + # TODO: this is where mmio callbacks need to be released from cache, + # but we cannot tell whether this is an mmio range. also, memory ranges + # might be splitted by 'map_protect' after they were mapped, so the + # (start, end) tuple may not be suitable for retrieving the callbacks. + # + # here we try to do that on a best-effort basis: + + rng = (address, address + size) + + if rng in self._mmio_callbacks: + del self._mmio_callbacks[rng] + + def mem_protect(self, address: int, size: int, perms: int = uc.UC_PROT_ALL) -> None: + """Modify access protection bitmask of a mapped memory range. + + Args: + address : range base address + size : range size (in bytes) + perms : new access protection bitmask + + Raises: `UcError` in case access protection bitmask could not be changed + """ + + assert (perms & ~uc.UC_PROT_ALL) == 0, 'unexpected perms bitmask' + + status = uclib.uc_mem_protect(self._uch, address, size, perms) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + def mmio_map(self, address: int, size: int, + read_cb: Optional[UC_MMIO_READ_TYPE], user_data_read: Any, + write_cb: Optional[UC_MMIO_WRITE_TYPE], user_data_write: Any) -> None: + + @uccallback(MMIO_READ_CFUNC) + def __mmio_map_read_cb(handle: int, offset: int, size: int, key: int) -> int: + assert read_cb is not None + + return read_cb(self, offset, size, user_data_read) + + @uccallback(MMIO_WRITE_CFUNC) + def __mmio_map_write_cb(handle: int, offset: int, size: int, value: int, key: int) -> None: + assert write_cb is not None + + write_cb(self, offset, size, value, user_data_write) + + read_cb_fptr = read_cb and __mmio_map_read_cb + write_cb_fptr = write_cb and __mmio_map_write_cb + + status = uclib.uc_mmio_map(self._uch, address, size, read_cb_fptr, 0, write_cb_fptr, 0) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + # hold a reference to mmio callbacks + rng = (address, address + size) + + self._mmio_callbacks[rng] = (read_cb_fptr, write_cb_fptr) + + def mem_regions(self) -> Iterator[Tuple[int, int, int]]: + """Iterate through mapped memory regions. + + Returns: an iterator whose elements contain begin, end and perms properties of each range + + Raises: `UcError` in case an itnernal error has been encountered + """ + regions = ctypes.POINTER(_uc_mem_region)() count = ctypes.c_uint32() - status = _uc.uc_mem_regions(self._uch, ctypes.byref(regions), ctypes.byref(count)) + status = uclib.uc_mem_regions(self._uch, ctypes.byref(regions), ctypes.byref(count)) + if status != uc.UC_ERR_OK: raise UcError(status) try: for i in range(count.value): - yield (regions[i].begin, regions[i].end, regions[i].perms) + yield regions[i].value + finally: - _uc.uc_free(regions) + uclib.uc_free(regions) + def mem_read(self, address: int, size: int) -> bytearray: + """Read data from emulated memory subsystem. + + Args: + address : source memory location + size : amount of bytes to read + + Returns: data bytes + + Raises: `UcError` in case of an invalid memory access + """ + + data = ctypes.create_string_buffer(size) + status = uclib.uc_mem_read(self._uch, address, data, size) + + if status != uc.UC_ERR_OK: + raise UcError(status, address, size) + + return bytearray(data) + + def mem_write(self, address: int, data: bytes) -> None: + """Write data to emulated memory subsystem. + + Args: + address : target memory location + data : data bytes to write + + Raises: `UcError` in case of an invalid memory access + """ + + size = len(data) + status = uclib.uc_mem_write(self._uch, address, data, size) + + if status != uc.UC_ERR_OK: + raise UcError(status, address, size) + + ########################### + # Event hooks management # + ########################### + + def __do_hook_add(self, htype: int, fptr: ctypes._FuncPointer, begin: int, end: int, *args: ctypes.c_int) -> int: + handle = uc_hook_h() + + # TODO: we do not need a callback counter to reference the callback and user data anymore, + # so just pass a dummy value. that value will become the unused 'key' argument + dummy = 0 + + status = uclib.uc_hook_add( + self._uch, + ctypes.byref(handle), + htype, + fptr, + ctypes.cast(dummy, ctypes.c_void_p), + ctypes.c_uint64(begin), + ctypes.c_uint64(end), + *args + ) -class UcContext: - def __init__(self, h, arch, mode): - self._context = uc_context() - self._size = _uc.uc_context_size(h) - self._to_free = True - status = _uc.uc_context_alloc(h, ctypes.byref(self._context)) if status != uc.UC_ERR_OK: raise UcError(status) + + # hold a reference to the funcion pointer to prevent it from being gc-ed + self._callbacks[handle.value] = fptr + + return handle.value + + def hook_add(self, htype: int, callback: Callable, user_data: Any = None, begin: int = 1, end: int = 0, aux1: int = 0, aux2: int = 0) -> int: + """Hook emulated events of a certain type. + + Args: + htype : event type(s) to hook (see UC_HOOK_* constants) + callback : a method to call each time the hooked event occurs + user_data : an additional context to pass to the callback when it is called + begin : address where hook scope starts + end : address where hook scope ends + aux1 : auxiliary parameter; needed for some hook types + aux2 : auxiliary parameter; needed for some hook types + + Returns: hook handle + + Raises: `UcError` in case of an invalid htype value + """ + + def __hook_intr(): + @uccallback(HOOK_INTR_CFUNC) + def __hook_intr_cb(handle: int, intno: int, key: int): + callback(self, intno, user_data) + + return __hook_intr_cb, + + def __hook_insn(): + # each arch is expected to overload hook_add and implement this handler on their own. + # if we got here, it means this particular architecture does not support hooking any + # instruction and so we fail + raise UcError(uc.UC_ERR_ARG) + + def __hook_code(): + @uccallback(HOOK_CODE_CFUNC) + def __hook_code_cb(handle: int, address: int, size: int, key: int): + callback(self, address, size, user_data) + + return __hook_code_cb, + + def __hook_invalid_mem(): + @uccallback(HOOK_MEM_INVALID_CFUNC) + def __hook_mem_invalid_cb(handle: int, access: int, address: int, size: int, value: int, key: int) -> bool: + return callback(self, access, address, size, value, user_data) + + return __hook_mem_invalid_cb, + + def __hook_mem(): + @uccallback(HOOK_MEM_ACCESS_CFUNC) + def __hook_mem_access_cb(handle: int, access: int, address: int, size: int, value: int, key: int) -> None: + callback(self, access, address, size, value, user_data) + + return __hook_mem_access_cb, + + def __hook_invalid_insn(): + @uccallback(HOOK_INSN_INVALID_CFUNC) + def __hook_insn_invalid_cb(handle: int, key: int) -> bool: + return callback(self, user_data) + + return __hook_insn_invalid_cb, + + def __hook_edge_gen(): + @uccallback(HOOK_EDGE_GEN_CFUNC) + def __hook_edge_gen_cb(handle: int, cur: ctypes._Pointer[uc_tb], prev: ctypes._Pointer[uc_tb], key: int): + callback(self, cur.contents, prev.contents, user_data) + + return __hook_edge_gen_cb, + + def __hook_tcg_opcode(): + @uccallback(HOOK_TCG_OPCODE_CFUNC) + def __hook_tcg_op_cb(handle: int, address: int, arg1: int, arg2: int, size: int, key: int): + callback(self, address, arg1, arg2, size, user_data) + + opcode = ctypes.c_uint64(aux1) + flags = ctypes.c_uint64(aux2) + + return __hook_tcg_op_cb, opcode, flags + + handlers: Mapping[int, Callable[[], Tuple]] = { + uc.UC_HOOK_INTR : __hook_intr, + uc.UC_HOOK_INSN : __hook_insn, + uc.UC_HOOK_CODE : __hook_code, + uc.UC_HOOK_BLOCK : __hook_code, + uc.UC_HOOK_MEM_READ_UNMAPPED : __hook_invalid_mem, + uc.UC_HOOK_MEM_WRITE_UNMAPPED : __hook_invalid_mem, + uc.UC_HOOK_MEM_FETCH_UNMAPPED : __hook_invalid_mem, + uc.UC_HOOK_MEM_READ_PROT : __hook_invalid_mem, + uc.UC_HOOK_MEM_WRITE_PROT : __hook_invalid_mem, + uc.UC_HOOK_MEM_FETCH_PROT : __hook_invalid_mem, + uc.UC_HOOK_MEM_READ : __hook_mem, + uc.UC_HOOK_MEM_WRITE : __hook_mem, + uc.UC_HOOK_MEM_FETCH : __hook_mem, + # uc.UC_HOOK_MEM_READ_AFTER + uc.UC_HOOK_INSN_INVALID : __hook_invalid_insn, + uc.UC_HOOK_EDGE_GENERATED : __hook_edge_gen, + uc.UC_HOOK_TCG_OPCODE : __hook_tcg_opcode + } + + # the same callback may be registered for multiple hook types if they + # share the same handling method. here we iterate through htype set bits + # and collect all unique handlers it refers to (no duplicates) + matched = set(handlers.get(1 << n) for n in range(32) if htype & (1 << n)) + + # the set of matched handlers is expected to include exactly one element. + # more than one member indicates that htype refers to more than one handler + # at the same time, whereas callbacks cannot be assigned to different handlers. + # an empty set indicates a matching handler was not found, probably due to + # an invalid htype value + if len(matched) != 1: + raise UcError(uc.UC_ERR_ARG) + + handler = matched.pop() + + # a None element indicates that htype has an unrecognized bit set + if handler is None: + raise UcError(uc.UC_ERR_ARG) + + fptr, *aux = handler() + + return self.__do_hook_add(htype, fptr, begin, end, *aux) + + def hook_del(self, handle: int) -> None: + """Remove an existing hook. + + Args: + handle: hook handle + """ + + h = uc_hook_h(handle) + status = uclib.uc_hook_del(self._uch, h) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + del self._callbacks[handle] + + def query(self, prop: int) -> int: + """Query an internal Unicorn property. + + Args: + prop: property identifier (see: UC_QUERY_* constants) + + Returns: property value + """ + + result = ctypes.c_size_t() + status = uclib.uc_query(self._uch, prop, ctypes.byref(result)) + + if status != uc.UC_ERR_OK: + raise UcError(status, prop) + + return result.value + + def context_save(self) -> UcContext: + context = UcContext(self._uch, self._arch, self._mode) + status = uclib.uc_context_save(self._uch, context.context) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + return context + + def context_update(self, context: UcContext) -> None: + status = uclib.uc_context_save(self._uch, context.context) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + def context_restore(self, context: UcContext) -> None: + status = uclib.uc_context_restore(self._uch, context.context) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + @staticmethod + def __ctl_encode(ctl: int, op: int, nargs: int) -> int: + assert nargs and (nargs & ~0b1111) == 0 + assert op and (op & ~0b11) == 0 + + return (op << 30) | (nargs << 26) | ctl + + def ctl(self, ctl: int, op: int, *args): + code = Uc.__ctl_encode(ctl, op, len(args)) + + status = uclib.uc_ctl(self._uch, code, *args) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + Arg = Tuple[Type, Optional[int]] + + def __ctl_r(self, ctl: int, arg0: Arg): + atype, _ = arg0 + carg = atype() + + self.ctl(ctl, uc.UC_CTL_IO_READ, ctypes.byref(carg)) + + return carg.value + + def __ctl_w(self, ctl: int, *args: Arg): + cargs = (atype(avalue) for atype, avalue in args) + + self.ctl(ctl, uc.UC_CTL_IO_WRITE, *cargs) + + def __ctl_wr(self, ctl: int, arg0: Arg, arg1: Arg): + atype, avalue = arg0 + carg0 = atype(avalue) + + atype, _ = arg1 + carg1 = atype() + + self.ctl(ctl, uc.UC_CTL_IO_READ_WRITE, carg0, ctypes.byref(carg1)) + + return carg1 + + def ctl_get_mode(self) -> int: + return self.__ctl_r(uc.UC_CTL_UC_MODE, + (ctypes.c_int, None) + ) + + def ctl_get_page_size(self) -> int: + return self.__ctl_r(uc.UC_CTL_UC_PAGE_SIZE, + (ctypes.c_uint32, None) + ) + + def ctl_set_page_size(self, val: int) -> None: + self.__ctl_w(uc.UC_CTL_UC_PAGE_SIZE, + (ctypes.c_uint32, val) + ) + + def ctl_get_arch(self) -> int: + return self.__ctl_r(uc.UC_CTL_UC_ARCH, + (ctypes.c_int, None) + ) + + def ctl_get_timeout(self) -> int: + return self.__ctl_r(uc.UC_CTL_UC_TIMEOUT, + (ctypes.c_uint64, None) + ) + + def ctl_exits_enabled(self, val: bool) -> None: + self.__ctl_w(uc.UC_CTL_UC_USE_EXITS, + (ctypes.c_int, val) + ) + + def ctl_get_exits_cnt(self) -> int: + return self.__ctl_r(uc.UC_CTL_UC_EXITS_CNT, + (ctypes.c_size_t, None) + ) + + def ctl_get_exits(self) -> Sequence[int]: + count = self.ctl_get_exits_cnt() + arr = (ctypes.c_uint64 * count)() + + self.ctl(uc.UC_CTL_UC_EXITS, uc.UC_CTL_IO_READ, ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(count)) + + return tuple(i for i in arr) + + def ctl_set_exits(self, exits: Sequence[int]) -> None: + arr = (ctypes.c_uint64 * len(exits))() + + for idx, exit in enumerate(exits): + arr[idx] = exit + + self.ctl(uc.UC_CTL_UC_EXITS, uc.UC_CTL_IO_WRITE, ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(len(exits))) + + def ctl_get_cpu_model(self) -> int: + return self.__ctl_r(uc.UC_CTL_CPU_MODEL, + (ctypes.c_int, None) + ) + + def ctl_set_cpu_model(self, val: int) -> None: + self.__ctl_w(uc.UC_CTL_CPU_MODEL, + (ctypes.c_int, val) + ) + + def ctl_remove_cache(self, addr: int, end: int) -> None: + self.__ctl_w(uc.UC_CTL_TB_REMOVE_CACHE, + (ctypes.c_uint64, addr), + (ctypes.c_uint64, end) + ) + + def ctl_request_cache(self, addr: int): + return self.__ctl_wr(uc.UC_CTL_TB_REQUEST_CACHE, + (ctypes.c_uint64, addr), + (uc_tb, None) + ) + + def ctl_flush_tb(self) -> None: + self.__ctl_w(uc.UC_CTL_TB_FLUSH) + + def ctl_tlb_mode(self, mode: int) -> None: + self.__ctl_w(uc.UC_CTL_TLB_TYPE, + (ctypes.c_uint, mode) + ) + + +class UcContext(RegStateManager): + def __init__(self, h, arch: int, mode: int): + self._context = uc_context() + self._size = uclib.uc_context_size(h) + + status = uclib.uc_context_alloc(h, ctypes.byref(self._context)) + + if status != uc.UC_ERR_OK: + raise UcError(status) + + self._to_free = True self._arch = arch self._mode = mode @@ -980,95 +1102,57 @@ class UcContext: return self._context @property - def size(self): + def size(self) -> int: return self._size @property - def arch(self): + def arch(self) -> int: return self._arch - + @property - def mode(self): + def mode(self) -> int: return self._mode - # return the value of a register - def reg_read(self, reg_id, opt=None): - return reg_read(functools.partial(_uc.uc_context_reg_read, self._context), self.arch, reg_id, opt) + # RegStateManager mixin method implementation + def _do_reg_read(self, reg_id: int, reg_obj) -> int: + """Private register read implementation. + """ - # write to a register - def reg_write(self, reg_id, value): - return reg_write(functools.partial(_uc.uc_context_reg_write, self._context), self.arch, reg_id, value) + return uclib.uc_context_reg_read(self._context, reg_id, reg_obj) + + # RegStateManager mixin method implementation + def _do_reg_write(self, reg_id: int, reg_obj) -> int: + """Private register write implementation. + """ + + return uclib.uc_context_reg_write(self._context, reg_id, reg_obj) # Make UcContext picklable def __getstate__(self): - return (bytes(self), self.size, self.arch, self.mode) + return bytes(self), self.size, self.arch, self.mode - def __setstate__(self, state): - self._size = state[1] - self._context = ctypes.cast(ctypes.create_string_buffer(state[0], self._size), uc_context) - # __init__ won'e be invoked, so we are safe to set it here. + def __setstate__(self, state) -> None: + context, size, arch, mode = state + + self._context = ctypes.cast(ctypes.create_string_buffer(context, size), uc_context) + self._size = size + self._arch = arch + self._mode = mode + + # __init__ won't be invoked, so we are safe to set it here. self._to_free = False - self._arch = state[2] - self._mode = state[3] - def __bytes__(self): + def __bytes__(self) -> bytes: return ctypes.string_at(self.context, self.size) - def __del__(self): + def __del__(self) -> None: # We need this property since we shouldn't free it if the object is constructed from pickled bytes. if self._to_free: - _uc.uc_context_free(self._context) + uclib.uc_context_free(self._context) + -UC_HOOK_CODE_TYPE = Callable[[Uc, int, int, Any], None] -UC_HOOK_INSN_INVALID_TYPE = Callable[[Uc, Any], bool] -UC_HOOK_MEM_INVALID_TYPE = Callable[[Uc, int, int, int, int, Any], bool] -UC_HOOK_MEM_ACCESS_TYPE = Callable[[Uc, int, int, int, int, Any], None] -UC_HOOK_INTR_TYPE = Callable[[Uc, int, Any], None] -UC_HOOK_INSN_IN_TYPE = Callable[[Uc, int, int, Any], int] -UC_HOOK_INSN_OUT_TYPE = Callable[[Uc, int, int, int, Any], None] -UC_HOOK_INSN_SYSCALL_TYPE = Callable[[Uc, Any], None] -UC_HOOK_INSN_SYS_TYPE = Callable[[Uc, int, Tuple[int, int, int, int, int, int], Any], int] -UC_HOOK_INSN_CPUID_TYPE = Callable[[Uc, Any], int] UC_MMIO_READ_TYPE = Callable[[Uc, int, int, Any], int] UC_MMIO_WRITE_TYPE = Callable[[Uc, int, int, int, Any], None] -UC_HOOK_EDGE_GEN_TYPE = Callable[[Uc, uc_tb, uc_tb, Any], None] -UC_HOOK_TCG_OPCODE_TYPE = Callable[[Uc, int, int, int, Any], None] -UC_HOOK_CALLBACK_TYPE = Union[ - UC_HOOK_CODE_TYPE, - UC_HOOK_INSN_INVALID_TYPE, - UC_HOOK_MEM_INVALID_TYPE, - UC_HOOK_MEM_ACCESS_TYPE, - UC_HOOK_INSN_IN_TYPE, - UC_HOOK_INSN_OUT_TYPE, - UC_HOOK_INSN_SYSCALL_TYPE, - UC_HOOK_INSN_SYS_TYPE, - UC_HOOK_INSN_CPUID_TYPE, - UC_HOOK_EDGE_GEN_TYPE, - UC_HOOK_TCG_OPCODE_TYPE -] -# print out debugging info -def debug(): - archs = { - "arm": uc.UC_ARCH_ARM, - "arm64": uc.UC_ARCH_ARM64, - "mips": uc.UC_ARCH_MIPS, - "sparc": uc.UC_ARCH_SPARC, - "m68k": uc.UC_ARCH_M68K, - "x86": uc.UC_ARCH_X86, - "riscv": uc.UC_ARCH_RISCV, - "ppc": uc.UC_ARCH_PPC, - } - - all_archs = "" - keys = archs.keys() - for k in sorted(keys): - if uc_arch_supported(archs[k]): - all_archs += "-%s" % k - - major, minor, _combined = uc_version() - - return "python-%s-c%u.%u-b%u.%u" % ( - all_archs, major, minor, uc.UC_API_MAJOR, uc.UC_API_MINOR - ) +__all__ = ['Uc', 'UcContext', 'ucsubclass']