From 923b4ad3cc28194c1848409e546ef6f5257b39af Mon Sep 17 00:00:00 2001 From: lazymio Date: Thu, 4 Nov 2021 21:47:30 +0100 Subject: [PATCH] Update python bindings --- bindings/python/sample_ctl.py | 122 +++++++++++++++++++++++++++++ bindings/python/unicorn/unicorn.py | 120 +++++++++++++++++++++++++++- 2 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 bindings/python/sample_ctl.py diff --git a/bindings/python/sample_ctl.py b/bindings/python/sample_ctl.py new file mode 100644 index 00000000..8e33ad29 --- /dev/null +++ b/bindings/python/sample_ctl.py @@ -0,0 +1,122 @@ +# Unicorn Emulator Engine +# By Lazymio(@wtdcode), 2021 + +from unicorn import * +from unicorn.unicorn import UC_HOOK_EDGE_GEN_CB +from unicorn.x86_const import * +from datetime import datetime + +def test_uc_ctl_read(): + uc = Uc(UC_ARCH_X86, UC_MODE_32) + + print("Reading some properties by uc_ctl.") + + arch = uc.ctl_get_arch() + + mode = uc.ctl_get_mode() + + page_size = uc.ctl_get_page_size() + + timeout = uc.ctl_get_timeout() + + print(f">>> arch={arch} mode={mode} page size={page_size} timeout={timeout}") + +def time_emulation(uc, start, end): + n = datetime.now() + + uc.emu_start(start, end) + + return (datetime.now() - n).total_seconds() * 1e6 + +def test_uc_ctl_tb_cache(): + # Initialize emulator in X86-32bit mode + uc = Uc(UC_ARCH_X86, UC_MODE_32) + addr = 0x10000 + + # Fill the code buffer with NOP. + code = b"\x90" * 8 * 512 + + print("Controling the TB cache in a finer granularity by uc_ctl.") + + uc.mem_map(addr, 0x10000) + + # Write our code to the memory. + uc.mem_write(addr, code) + + # Do emulation without any cache. + standard = time_emulation(uc, addr, addr + len(code)) + + # Now we request cache for all TBs. + for i in range(8): + tb = uc.ctl_request_cache(addr + i * 512) + print(f">>> TB is cached at {hex(tb.pc)} which has {tb.icount} instructions with {tb.size} bytes") + + # Do emulation with all TB cached. + cached = time_emulation(uc, addr, addr + len(code)) + + # Now we clear cache for all TBs. + for i in range(8): + uc.ctl_remove_cache(addr + i * 512) + + evicted = time_emulation(uc, addr, addr + len(code)) + + print(f">>> Run time: First time {standard}, Cached: {cached}, Cached evicted: {evicted}") + +def trace_new_edge(uc, cur, prev, data): + print(f">>> Getting a new edge from {hex(prev.pc + prev.size - 1)} to {hex(cur.pc)}") + +def trace_tcg_sub(uc, address, arg1, arg2, data): + print(f">>> Get a tcg sub opcode at {hex(address)} with args: {arg1} and {arg2}") + +def test_uc_ctl_exits(): + uc = Uc(UC_ARCH_X86, UC_MODE_32) + addr = 0x1000 + # cmp eax, 0; + # jg lb; + # inc eax; + # nop; + # lb: + # inc ebx; + # nop; + code = b"\x83\xf8\x00\x7f\x02\x40\x90\x43\x90" + exits = [addr + 6, addr + 8] + + print("Using multiple exits by uc_ctl") + + uc.mem_map(addr, 0x1000) + + # Write our code to the memory. + uc.mem_write(addr, code) + + # We trace if any new edge is generated. + uc.hook_add(UC_HOOK_EDGE_GENERATED, trace_new_edge) + + # Trace cmp instruction. + uc.hook_add(UC_HOOK_TCG_OPCODE, trace_tcg_sub, UC_TCG_OP_SUB, UC_TCG_OP_FLAG_CMP) + + uc.ctl_exits_enabled(True) + + uc.ctl_set_exits(exits) + + # This should stop at ADDRESS + 6 and increase eax, even thouhg we don't provide an exit. + uc.emu_start(addr, 0) + + eax = uc.reg_read(UC_X86_REG_EAX) + ebx = uc.reg_read(UC_X86_REG_EBX) + + print(f">>> eax = {hex(eax)} and ebx = {hex(ebx)} after the first emulation") + + # This should stop at ADDRESS + 8, even thouhg we don't provide an exit. + uc.emu_start(addr, 0) + + eax = uc.reg_read(UC_X86_REG_EAX) + ebx = uc.reg_read(UC_X86_REG_EBX) + + print(f">>> eax = {hex(eax)} and ebx = {hex(ebx)} after the first emulation") + +if __name__ == "__main__": + test_uc_ctl_read() + print("="*32) + test_uc_ctl_tb_cache() + print("="*32) + test_uc_ctl_exits() \ No newline at end of file diff --git a/bindings/python/unicorn/unicorn.py b/bindings/python/unicorn/unicorn.py index 19df82e0..c929c23c 100644 --- a/bindings/python/unicorn/unicorn.py +++ b/bindings/python/unicorn/unicorn.py @@ -121,6 +121,13 @@ class _uc_mem_region(ctypes.Structure): ("perms", ctypes.c_uint32), ] +class uc_tb(ctypes.Structure): + """"TranslationBlock""" + _fields_ = [ + ("pc", ctypes.c_uint64), + ("icount", ctypes.c_uint16), + ("size", ctypes.c_uint16) + ] _setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)) _setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int) @@ -152,6 +159,7 @@ _setup_prototype(_uc, "uc_context_free", ucerr, uc_context) _setup_prototype(_uc, "uc_mem_regions", ucerr, uc_engine, ctypes.POINTER(ctypes.POINTER(_uc_mem_region)), ctypes.POINTER(ctypes.c_uint32)) # https://bugs.python.org/issue42880 _setup_prototype(_uc, "uc_hook_add", ucerr, uc_engine, ctypes.POINTER(uc_hook_h), ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64) +_setup_prototype(_uc, "uc_ctl", ucerr, uc_engine, ctypes.c_int) UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p) UC_HOOK_INSN_INVALID_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_void_p) @@ -180,6 +188,12 @@ UC_MMIO_READ_CB = ctypes.CFUNCTYPE( UC_MMIO_WRITE_CB = ctypes.CFUNCTYPE( None, uc_engine, ctypes.c_uint64, ctypes.c_int, ctypes.c_uint64, ctypes.c_void_p ) +UC_HOOK_EDGE_GEN_CB = ctypes.CFUNCTYPE( + None, uc_engine, ctypes.POINTER(uc_tb), ctypes.POINTER(uc_tb), ctypes.c_void_p +) +UC_HOOK_TCG_OPCODE_CB = ctypes.CFUNCTYPE( + None, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_void_p +) # access to error code via @errno of UcError class UcError(Exception): @@ -548,6 +562,16 @@ class Uc(object): raise UcError(status) return result.value + @_catch_hook_exception + def _hook_tcg_op_cb(self, handle, address, arg1, arg2, user_data): + (cb, data) = self._callbacks[user_data] + cb(self, address, arg1, arg2, user_data) + + @_catch_hook_exception + def _hook_edge_gen_cb(self, handle, cur, prev, user_data): + (cb, data) = self._callbacks[user_data] + cb(self, cur.contents, prev.contents, user_data) + @_catch_hook_exception def _hookcode_cb(self, handle, address, size, user_data): # call user's callback with self object @@ -596,8 +620,86 @@ class Uc(object): (cb, data) = self._callbacks[user_data] cb(self, data) + def ctl(self, control, *args): + status = _uc.uc_ctl(self._uch, control, *args) + if status != uc.UC_ERR_OK: + raise UcError(status) + return status + + def __ctl(self, ctl, nr, rw): + return ctl | (nr << 26) | (rw << 30) + + def __ctl_r(self, ctl, nr): + return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ) + + def __ctl_w(self, ctl, nr): + return self.__ctl(ctl, nr, uc.UC_CTL_IO_WRITE) + + def __ctl_rw(self, ctl, nr): + return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ_WRITE) + + def __ctl_r_1_arg(self, ctl, ctp): + arg = ctp() + self.ctl(self.__ctl_r(ctl, 1), ctypes.byref(arg)) + return arg.value + + def __ctl_w_1_arg(self, ctl, val, ctp): + arg = ctp(val) + self.ctl(self.__ctl_w(ctl, 1), arg) + + def __ctl_rw_1_1_arg(self, ctl, val, ctp1, ctp2): + arg1 = ctp1(val) + arg2 = ctp2() + self.ctl(self.__ctl_rw(ctl, 2), arg1, ctypes.byref(arg2)) + return arg2 + + def ctl_get_mode(self): + return self.__ctl_r_1_arg(uc.UC_CTL_UC_MODE, ctypes.c_int) + + def ctl_get_page_size(self): + return self.__ctl_r_1_arg(uc.UC_CTL_UC_PAGE_SIZE, ctypes.c_uint32) + + def ctl_set_page_size(self, val): + self.__ctl_w_1_arg(uc.UC_CTL_UC_PAGE_SIZE, val, ctypes.c_uint32) + + def ctl_get_arch(self): + return self.__ctl_r_1_arg(uc.UC_CTL_UC_ARCH, ctypes.c_int) + + def ctl_get_timeout(self): + return self.__ctl_r_1_arg(uc.UC_CTL_UC_TIMEOUT, ctypes.c_uint64) + + def ctl_exits_enabled(self, val): + self.__ctl_w_1_arg(uc.UC_CTL_UC_USE_EXITS, val, ctypes.c_int) + + def ctl_get_exits_cnt(self): + return self.__ctl_r_1_arg(uc.UC_CTL_UC_EXITS_CNT, ctypes.c_size_t) + + def ctl_get_exits(self): + l = self.ctl_get_exits_cnt() + arr = (ctypes.c_uint64 * l)() + self.ctl(self.__ctl_r(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(l)) + return [i for i in arr] + + def ctl_set_exits(self, exits): + arr = (ctypes.c_uint64 * len(exits))() + for idx, exit in enumerate(exits): + arr[idx] = exit + self.ctl(self.__ctl_w(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(len(exits))) + + def ctl_get_cpu_model(self): + return self.__ctl_r_1_arg(uc.UC_CTL_CPU_MODEL, ctypes.c_int) + + def ctl_set_cpu_model(self, val): + self.__ctl_w_1_arg(uc.UC_CTL_CPU_MODEL, val, ctypes.c_int) + + def ctl_remove_cache(self, addr): + self.__ctl_w_1_arg(uc.UC_CTL_TB_REMOVE_CACHE, addr, ctypes.c_uint64) + + def ctl_request_cache(self, addr): + return self.__ctl_rw_1_1_arg(uc.UC_CTL_TB_REQUEST_CACHE, addr, ctypes.c_uint64, uc_tb) + # add a hook - def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0): + def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0, arg2=0): _h2 = uc_hook_h() # save callback & user_data @@ -618,6 +720,15 @@ class Uc(object): ctypes.cast(self._callback_count, ctypes.c_void_p), ctypes.c_uint64(begin), ctypes.c_uint64(end), insn ) + elif htype == uc.UC_HOOK_TCG_OPCODE: + opcode = ctypes.c_int(arg1) + flags = ctypes.c_int(arg2) + + status = _uc.uc_hook_add( + self._uch, ctypes.byref(_h2), htype, ctypes.cast(UC_HOOK_TCG_OPCODE_CB(self._hook_tcg_op_cb), UC_HOOK_TCG_OPCODE_CB), + ctypes.cast(self._callback_count, ctypes.c_void_p), + ctypes.c_uint64(begin), ctypes.c_uint64(end), opcode, flags + ) elif htype == uc.UC_HOOK_INTR: cb = ctypes.cast(UC_HOOK_INTR_CB(self._hook_intr_cb), UC_HOOK_INTR_CB) status = _uc.uc_hook_add( @@ -632,6 +743,13 @@ class Uc(object): ctypes.cast(self._callback_count, ctypes.c_void_p), ctypes.c_uint64(begin), ctypes.c_uint64(end) ) + elif htype == uc.UC_HOOK_EDGE_GENERATED: + cb = ctypes.cast(UC_HOOK_EDGE_GEN_CB(self._hook_edge_gen_cb), UC_HOOK_EDGE_GEN_CB) + status = _uc.uc_hook_add( + self._uch, ctypes.byref(_h2), htype, cb, + ctypes.cast(self._callback_count, ctypes.c_void_p), + ctypes.c_uint64(begin), ctypes.c_uint64(end) + ) else: if htype in (uc.UC_HOOK_BLOCK, uc.UC_HOOK_CODE): # set callback with wrapper, so it can be called