Update python bindings

This commit is contained in:
lazymio 2021-11-04 21:47:30 +01:00
parent cd02c25802
commit 923b4ad3cc
No known key found for this signature in database
GPG Key ID: DFF27E34A47CB873
2 changed files with 241 additions and 1 deletions

View File

@ -0,0 +1,122 @@
# Unicorn Emulator Engine
# By Lazymio(@wtdcode), 2021
from unicorn import *
from unicorn.unicorn import UC_HOOK_EDGE_GEN_CB
from unicorn.x86_const import *
from datetime import datetime
def test_uc_ctl_read():
uc = Uc(UC_ARCH_X86, UC_MODE_32)
print("Reading some properties by uc_ctl.")
arch = uc.ctl_get_arch()
mode = uc.ctl_get_mode()
page_size = uc.ctl_get_page_size()
timeout = uc.ctl_get_timeout()
print(f">>> arch={arch} mode={mode} page size={page_size} timeout={timeout}")
def time_emulation(uc, start, end):
n = datetime.now()
uc.emu_start(start, end)
return (datetime.now() - n).total_seconds() * 1e6
def test_uc_ctl_tb_cache():
# Initialize emulator in X86-32bit mode
uc = Uc(UC_ARCH_X86, UC_MODE_32)
addr = 0x10000
# Fill the code buffer with NOP.
code = b"\x90" * 8 * 512
print("Controling the TB cache in a finer granularity by uc_ctl.")
uc.mem_map(addr, 0x10000)
# Write our code to the memory.
uc.mem_write(addr, code)
# Do emulation without any cache.
standard = time_emulation(uc, addr, addr + len(code))
# Now we request cache for all TBs.
for i in range(8):
tb = uc.ctl_request_cache(addr + i * 512)
print(f">>> TB is cached at {hex(tb.pc)} which has {tb.icount} instructions with {tb.size} bytes")
# Do emulation with all TB cached.
cached = time_emulation(uc, addr, addr + len(code))
# Now we clear cache for all TBs.
for i in range(8):
uc.ctl_remove_cache(addr + i * 512)
evicted = time_emulation(uc, addr, addr + len(code))
print(f">>> Run time: First time {standard}, Cached: {cached}, Cached evicted: {evicted}")
def trace_new_edge(uc, cur, prev, data):
print(f">>> Getting a new edge from {hex(prev.pc + prev.size - 1)} to {hex(cur.pc)}")
def trace_tcg_sub(uc, address, arg1, arg2, data):
print(f">>> Get a tcg sub opcode at {hex(address)} with args: {arg1} and {arg2}")
def test_uc_ctl_exits():
uc = Uc(UC_ARCH_X86, UC_MODE_32)
addr = 0x1000
# cmp eax, 0;
# jg lb;
# inc eax;
# nop;
# lb:
# inc ebx;
# nop;
code = b"\x83\xf8\x00\x7f\x02\x40\x90\x43\x90"
exits = [addr + 6, addr + 8]
print("Using multiple exits by uc_ctl")
uc.mem_map(addr, 0x1000)
# Write our code to the memory.
uc.mem_write(addr, code)
# We trace if any new edge is generated.
uc.hook_add(UC_HOOK_EDGE_GENERATED, trace_new_edge)
# Trace cmp instruction.
uc.hook_add(UC_HOOK_TCG_OPCODE, trace_tcg_sub, UC_TCG_OP_SUB, UC_TCG_OP_FLAG_CMP)
uc.ctl_exits_enabled(True)
uc.ctl_set_exits(exits)
# This should stop at ADDRESS + 6 and increase eax, even thouhg we don't provide an exit.
uc.emu_start(addr, 0)
eax = uc.reg_read(UC_X86_REG_EAX)
ebx = uc.reg_read(UC_X86_REG_EBX)
print(f">>> eax = {hex(eax)} and ebx = {hex(ebx)} after the first emulation")
# This should stop at ADDRESS + 8, even thouhg we don't provide an exit.
uc.emu_start(addr, 0)
eax = uc.reg_read(UC_X86_REG_EAX)
ebx = uc.reg_read(UC_X86_REG_EBX)
print(f">>> eax = {hex(eax)} and ebx = {hex(ebx)} after the first emulation")
if __name__ == "__main__":
test_uc_ctl_read()
print("="*32)
test_uc_ctl_tb_cache()
print("="*32)
test_uc_ctl_exits()

View File

@ -121,6 +121,13 @@ class _uc_mem_region(ctypes.Structure):
("perms", ctypes.c_uint32),
]
class uc_tb(ctypes.Structure):
""""TranslationBlock"""
_fields_ = [
("pc", ctypes.c_uint64),
("icount", ctypes.c_uint16),
("size", ctypes.c_uint16)
]
_setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int))
_setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int)
@ -152,6 +159,7 @@ _setup_prototype(_uc, "uc_context_free", ucerr, uc_context)
_setup_prototype(_uc, "uc_mem_regions", ucerr, uc_engine, ctypes.POINTER(ctypes.POINTER(_uc_mem_region)), ctypes.POINTER(ctypes.c_uint32))
# https://bugs.python.org/issue42880
_setup_prototype(_uc, "uc_hook_add", ucerr, uc_engine, ctypes.POINTER(uc_hook_h), ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64)
_setup_prototype(_uc, "uc_ctl", ucerr, uc_engine, ctypes.c_int)
UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p)
UC_HOOK_INSN_INVALID_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_void_p)
@ -180,6 +188,12 @@ UC_MMIO_READ_CB = ctypes.CFUNCTYPE(
UC_MMIO_WRITE_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint64, ctypes.c_int, ctypes.c_uint64, ctypes.c_void_p
)
UC_HOOK_EDGE_GEN_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.POINTER(uc_tb), ctypes.POINTER(uc_tb), ctypes.c_void_p
)
UC_HOOK_TCG_OPCODE_CB = ctypes.CFUNCTYPE(
None, uc_engine, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_void_p
)
# access to error code via @errno of UcError
class UcError(Exception):
@ -548,6 +562,16 @@ class Uc(object):
raise UcError(status)
return result.value
@_catch_hook_exception
def _hook_tcg_op_cb(self, handle, address, arg1, arg2, user_data):
(cb, data) = self._callbacks[user_data]
cb(self, address, arg1, arg2, user_data)
@_catch_hook_exception
def _hook_edge_gen_cb(self, handle, cur, prev, user_data):
(cb, data) = self._callbacks[user_data]
cb(self, cur.contents, prev.contents, user_data)
@_catch_hook_exception
def _hookcode_cb(self, handle, address, size, user_data):
# call user's callback with self object
@ -596,8 +620,86 @@ class Uc(object):
(cb, data) = self._callbacks[user_data]
cb(self, data)
def ctl(self, control, *args):
status = _uc.uc_ctl(self._uch, control, *args)
if status != uc.UC_ERR_OK:
raise UcError(status)
return status
def __ctl(self, ctl, nr, rw):
return ctl | (nr << 26) | (rw << 30)
def __ctl_r(self, ctl, nr):
return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ)
def __ctl_w(self, ctl, nr):
return self.__ctl(ctl, nr, uc.UC_CTL_IO_WRITE)
def __ctl_rw(self, ctl, nr):
return self.__ctl(ctl, nr, uc.UC_CTL_IO_READ_WRITE)
def __ctl_r_1_arg(self, ctl, ctp):
arg = ctp()
self.ctl(self.__ctl_r(ctl, 1), ctypes.byref(arg))
return arg.value
def __ctl_w_1_arg(self, ctl, val, ctp):
arg = ctp(val)
self.ctl(self.__ctl_w(ctl, 1), arg)
def __ctl_rw_1_1_arg(self, ctl, val, ctp1, ctp2):
arg1 = ctp1(val)
arg2 = ctp2()
self.ctl(self.__ctl_rw(ctl, 2), arg1, ctypes.byref(arg2))
return arg2
def ctl_get_mode(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_MODE, ctypes.c_int)
def ctl_get_page_size(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_PAGE_SIZE, ctypes.c_uint32)
def ctl_set_page_size(self, val):
self.__ctl_w_1_arg(uc.UC_CTL_UC_PAGE_SIZE, val, ctypes.c_uint32)
def ctl_get_arch(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_ARCH, ctypes.c_int)
def ctl_get_timeout(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_TIMEOUT, ctypes.c_uint64)
def ctl_exits_enabled(self, val):
self.__ctl_w_1_arg(uc.UC_CTL_UC_USE_EXITS, val, ctypes.c_int)
def ctl_get_exits_cnt(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_EXITS_CNT, ctypes.c_size_t)
def ctl_get_exits(self):
l = self.ctl_get_exits_cnt()
arr = (ctypes.c_uint64 * l)()
self.ctl(self.__ctl_r(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(l))
return [i for i in arr]
def ctl_set_exits(self, exits):
arr = (ctypes.c_uint64 * len(exits))()
for idx, exit in enumerate(exits):
arr[idx] = exit
self.ctl(self.__ctl_w(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(len(exits)))
def ctl_get_cpu_model(self):
return self.__ctl_r_1_arg(uc.UC_CTL_CPU_MODEL, ctypes.c_int)
def ctl_set_cpu_model(self, val):
self.__ctl_w_1_arg(uc.UC_CTL_CPU_MODEL, val, ctypes.c_int)
def ctl_remove_cache(self, addr):
self.__ctl_w_1_arg(uc.UC_CTL_TB_REMOVE_CACHE, addr, ctypes.c_uint64)
def ctl_request_cache(self, addr):
return self.__ctl_rw_1_1_arg(uc.UC_CTL_TB_REQUEST_CACHE, addr, ctypes.c_uint64, uc_tb)
# add a hook
def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0):
def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0, arg2=0):
_h2 = uc_hook_h()
# save callback & user_data
@ -618,6 +720,15 @@ class Uc(object):
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end), insn
)
elif htype == uc.UC_HOOK_TCG_OPCODE:
opcode = ctypes.c_int(arg1)
flags = ctypes.c_int(arg2)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, ctypes.cast(UC_HOOK_TCG_OPCODE_CB(self._hook_tcg_op_cb), UC_HOOK_TCG_OPCODE_CB),
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end), opcode, flags
)
elif htype == uc.UC_HOOK_INTR:
cb = ctypes.cast(UC_HOOK_INTR_CB(self._hook_intr_cb), UC_HOOK_INTR_CB)
status = _uc.uc_hook_add(
@ -632,6 +743,13 @@ class Uc(object):
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
elif htype == uc.UC_HOOK_EDGE_GENERATED:
cb = ctypes.cast(UC_HOOK_EDGE_GEN_CB(self._hook_edge_gen_cb), UC_HOOK_EDGE_GEN_CB)
status = _uc.uc_hook_add(
self._uch, ctypes.byref(_h2), htype, cb,
ctypes.cast(self._callback_count, ctypes.c_void_p),
ctypes.c_uint64(begin), ctypes.c_uint64(end)
)
else:
if htype in (uc.UC_HOOK_BLOCK, uc.UC_HOOK_CODE):
# set callback with wrapper, so it can be called