mirror of https://github.com/geohot/qira
285 lines
7.8 KiB
Python
Executable File
285 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python2.7
|
|
from __future__ import print_function
|
|
|
|
# NO MORE RADARE
|
|
# tags should be dynamically generated
|
|
# like when you request the 'instruction' tag, it triggers the disassembly
|
|
# when you set the 'name' tag, it dedups names, and updates the reverse index
|
|
# when you set the 'scope' tag, it adds it as a member of the function
|
|
# so it's a "managed" key value store
|
|
# don't worry at all about caching unless things are too slow
|
|
|
|
# stuff from Program should be moved here
|
|
# this class should contain all of the information about an independent run of the binary
|
|
# move the webserver code out of here, and perhaps into qira_webserver
|
|
|
|
|
|
# *** EXISTING TAGS ***
|
|
# len -- bytes that go with this one
|
|
# name -- name of this address
|
|
# comment -- comment on this address
|
|
# instruction -- string of this instruction
|
|
# arch -- arch of this instruction
|
|
# crefs -- code xrefs
|
|
# type -- type of instruction
|
|
|
|
|
|
# objects are allowed in the key-value store,
|
|
# but they should do something sane for the javascript on repr
|
|
|
|
# fhex and ghex shouldn't be used
|
|
# all addresses are numbers
|
|
|
|
import collections
|
|
import os, sys
|
|
import re
|
|
import pickle
|
|
import atexit
|
|
from hashlib import sha1
|
|
|
|
sys.path.append("../middleware")
|
|
import qira_config
|
|
|
|
from model import *
|
|
|
|
import loader
|
|
import analyzer
|
|
|
|
# the new interface for all things static
|
|
# mostly tags, except for names and functions
|
|
class Static:
|
|
def __init__(self, path, debug=1):
|
|
self.tags = {}
|
|
self.path = path
|
|
self.debug = debug
|
|
|
|
# radare doesn't seem to have a concept of names
|
|
# doesn't matter if this is in the python
|
|
self.rnames = {}
|
|
|
|
# fall through on an instruction
|
|
# 'arch'
|
|
self.global_tags = Tags(self)
|
|
self.global_tags['functions'] = set()
|
|
self.global_tags['blocks'] = set()
|
|
self.global_tags['segments'] = []
|
|
|
|
# concept from qira_program
|
|
self.base_memory = {}
|
|
|
|
self.analyzer = analyzer
|
|
loader.load_binary(self)
|
|
|
|
if self.debug >= 1:
|
|
print("*** elf loaded")
|
|
|
|
"""
|
|
# create the static cache dir
|
|
try:
|
|
os.mkdir(qira_config.STATIC_CACHE_BASE)
|
|
except:
|
|
pass
|
|
self.scf = qira_config.STATIC_CACHE_BASE + sha1(open(self.path, "rb").read()).hexdigest()
|
|
# check the cache
|
|
if os.path.isfile(self.scf):
|
|
# cache is global_tags + tags
|
|
with open(self.scf) as f:
|
|
try:
|
|
dd = pickle.load(f)
|
|
print "*** read %d bytes from static cache" % f.tell()
|
|
except:
|
|
dd = None
|
|
print "*** static cache corrupt, ignoring"
|
|
if dd != None:
|
|
self.deserialize(dd)
|
|
pass
|
|
|
|
# register cache writing
|
|
def write_cache():
|
|
with open(self.scf, "wb") as f:
|
|
dat = self.serialize()
|
|
pickle.dump(dat, f)
|
|
print "*** wrote %d bytes to static cache" % f.tell()
|
|
|
|
atexit.register(write_cache)
|
|
"""
|
|
|
|
def serialize(self):
|
|
def blacklist(d):
|
|
ret = {}
|
|
for k in d:
|
|
#if k == "instruction":
|
|
if k != "name":
|
|
continue
|
|
ret[k] = d[k]
|
|
return ret
|
|
kk = self.tags.keys()
|
|
vv = map(lambda x: blacklist(self.tags[x].backing), kk)
|
|
return self.global_tags.backing, kk, vv
|
|
|
|
def deserialize(self, dat):
|
|
gt, kk, vv = dat
|
|
for k in gt:
|
|
self[k] = gt[k]
|
|
|
|
for address, dd in zip(kk, vv):
|
|
for k in dd:
|
|
self[address][k] = dd[k]
|
|
|
|
# this should be replaced with a
|
|
def set_name(self, address, name):
|
|
if name not in self.rnames:
|
|
self.rnames[name] = address
|
|
elif address != self.rnames[name]:
|
|
# add underscore if name already exists
|
|
return self.set_name(address, name+"_")
|
|
return name
|
|
|
|
def _auto_update_name(self, address, name):
|
|
'''modifies the name of address based on data from analyses
|
|
but if we already have a name (from a user or symbols) do nothing'''
|
|
if not self[address]['name']:
|
|
self[address]['name'] = name
|
|
|
|
def get_address_by_name(self, name):
|
|
if name in self.rnames:
|
|
return self.rnames[name]
|
|
else:
|
|
return None
|
|
|
|
def _insert_names(self,st):
|
|
'''TODO kind of fugly
|
|
takes in a string and replaces things like 0x???????? with
|
|
the name of that address, if it exists
|
|
doesn't make sense to be used externally...'''
|
|
st = str(st)
|
|
m = map(lambda x:int(x,16),re.findall(r"(?<=0x)[0-9a-f]+",st))
|
|
for val in m:
|
|
if self[val]['name']:
|
|
st = st.replace(hex(val),self[val]['name'])
|
|
return st
|
|
|
|
# keep the old tags interface
|
|
# names and function data no longer stored here
|
|
# things like xrefs can go here
|
|
# only write functional tags here
|
|
# comment -- comment on this address
|
|
# len -- number of bytes grouped with this one
|
|
# instruction -- string of this instruction
|
|
# type -- unset, 'instruction', 'data', 'string'
|
|
def get_tags(self, filt, addresses=None):
|
|
ret = {}
|
|
if addresses == None:
|
|
# all the addresses
|
|
addresses = self.tags.keys()
|
|
for a in addresses:
|
|
rret = {}
|
|
for f in filt:
|
|
t = self[a][f]
|
|
if t != None:
|
|
rret[f] = t
|
|
if rret != {}:
|
|
ret[a] = rret
|
|
return ret
|
|
|
|
def __setitem__(self, address, dat):
|
|
if type(address) is str:
|
|
self.global_tags[address] = dat
|
|
|
|
# for a single address
|
|
def __getitem__(self, address):
|
|
if type(address) is str:
|
|
if address in self.global_tags:
|
|
return self.global_tags[address]
|
|
else:
|
|
return None
|
|
if address not in self.tags:
|
|
self.tags[address] = Tags(self, address)
|
|
return self.tags[address]
|
|
|
|
# return the memory at address:ln
|
|
# replaces get_static_bytes
|
|
# TODO: refactor this!
|
|
def memory(self, address, ln):
|
|
dat = []
|
|
def ret():
|
|
if (sys.version_info > (3, 0)):
|
|
return bytes(dat)
|
|
else:
|
|
return ''.join(dat)
|
|
for i in range(ln):
|
|
ri = address+i
|
|
|
|
# hack for "RuntimeError: dictionary changed size during iteration"
|
|
for (ss, se) in self.base_memory.keys():
|
|
if ss <= ri and ri < se:
|
|
try:
|
|
dat.append(self.base_memory[(ss,se)][ri-ss])
|
|
break
|
|
except:
|
|
return ret()
|
|
return ret()
|
|
|
|
def add_memory_chunk(self, address, dat):
|
|
#print "add segment",hex(address),len(dat)
|
|
# check for dups
|
|
for (laddress, llength) in self.base_memory:
|
|
if address == laddress:
|
|
if self.base_memory[(laddress, llength)] != dat:
|
|
print("*** WARNING, changing segment",hex(laddress),llength)
|
|
return
|
|
|
|
# segments should have an idea of segment permission
|
|
self['segments'].append((address, len(dat)))
|
|
self.base_memory[(address, address+len(dat))] = dat
|
|
|
|
def process(self):
|
|
self.analyzer.analyze_functions(self)
|
|
if self.debug >= 1:
|
|
print("*** static found %d functions" % len(self['functions']))
|
|
|
|
|
|
# *** STATIC TEST STUFF ***
|
|
|
|
if __name__ == "__main__":
|
|
static = Static(sys.argv[1],debug=1)
|
|
print("arch:",static['arch'])
|
|
|
|
# find main
|
|
static.process()
|
|
"""
|
|
main = static.get_address_by_name("main")
|
|
print "main is at", main
|
|
recursive.make_function_at(static, static['entry'])
|
|
print "found %d functions" % len(static['functions'])
|
|
recursive.make_function_at(static, main)
|
|
print "found %d functions" % len(static['functions'])
|
|
"""
|
|
|
|
|
|
# function printer
|
|
for f in sorted(static['functions']):
|
|
print(static[f.start]['name'] or hex(f.start), f)
|
|
for b in sorted(f.blocks):
|
|
print(" ",b)
|
|
for a in sorted(b.addresses):
|
|
print(" ",hex(a),static._insert_names(static[a]['instruction']))
|
|
|
|
|
|
# print symbols
|
|
print("symbols")
|
|
names = static.get_tags(['name'])
|
|
for addr in names:
|
|
print("%8x: %s" % (addr, names[addr]['name']))
|
|
|
|
#print static['functions']
|
|
|
|
#print static[main]['instruction'], map(hex, static[main]['crefs'])
|
|
#print static.get_tags(['name'])
|
|
#bw_functions = byteweight.fsi(static)
|
|
#for f in bw_functions:
|
|
#print hex(f)
|
|
#hexdump(static.memory(f, 0x20))
|
|
|
|
|