qira/static2/static2.py

285 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python2.7
from __future__ import print_function
# NO MORE RADARE
# tags should be dynamically generated
# like when you request the 'instruction' tag, it triggers the disassembly
# when you set the 'name' tag, it dedups names, and updates the reverse index
# when you set the 'scope' tag, it adds it as a member of the function
# so it's a "managed" key value store
# don't worry at all about caching unless things are too slow
# stuff from Program should be moved here
# this class should contain all of the information about an independent run of the binary
# move the webserver code out of here, and perhaps into qira_webserver
# *** EXISTING TAGS ***
# len -- bytes that go with this one
# name -- name of this address
# comment -- comment on this address
# instruction -- string of this instruction
# arch -- arch of this instruction
# crefs -- code xrefs
# type -- type of instruction
# objects are allowed in the key-value store,
# but they should do something sane for the javascript on repr
# fhex and ghex shouldn't be used
# all addresses are numbers
import collections
import os, sys
import re
import pickle
import atexit
from hashlib import sha1
sys.path.append("../middleware")
import qira_config
from model import *
import loader
import analyzer
# the new interface for all things static
# mostly tags, except for names and functions
class Static:
def __init__(self, path, debug=1):
self.tags = {}
self.path = path
self.debug = debug
# radare doesn't seem to have a concept of names
# doesn't matter if this is in the python
self.rnames = {}
# fall through on an instruction
# 'arch'
self.global_tags = Tags(self)
self.global_tags['functions'] = set()
self.global_tags['blocks'] = set()
self.global_tags['segments'] = []
# concept from qira_program
self.base_memory = {}
self.analyzer = analyzer
loader.load_binary(self)
if self.debug >= 1:
print("*** elf loaded")
"""
# create the static cache dir
try:
os.mkdir(qira_config.STATIC_CACHE_BASE)
except:
pass
self.scf = qira_config.STATIC_CACHE_BASE + sha1(open(self.path, "rb").read()).hexdigest()
# check the cache
if os.path.isfile(self.scf):
# cache is global_tags + tags
with open(self.scf) as f:
try:
dd = pickle.load(f)
print "*** read %d bytes from static cache" % f.tell()
except:
dd = None
print "*** static cache corrupt, ignoring"
if dd != None:
self.deserialize(dd)
pass
# register cache writing
def write_cache():
with open(self.scf, "wb") as f:
dat = self.serialize()
pickle.dump(dat, f)
print "*** wrote %d bytes to static cache" % f.tell()
atexit.register(write_cache)
"""
def serialize(self):
def blacklist(d):
ret = {}
for k in d:
#if k == "instruction":
if k != "name":
continue
ret[k] = d[k]
return ret
kk = self.tags.keys()
vv = map(lambda x: blacklist(self.tags[x].backing), kk)
return self.global_tags.backing, kk, vv
def deserialize(self, dat):
gt, kk, vv = dat
for k in gt:
self[k] = gt[k]
for address, dd in zip(kk, vv):
for k in dd:
self[address][k] = dd[k]
# this should be replaced with a
def set_name(self, address, name):
if name not in self.rnames:
self.rnames[name] = address
elif address != self.rnames[name]:
# add underscore if name already exists
return self.set_name(address, name+"_")
return name
def _auto_update_name(self, address, name):
'''modifies the name of address based on data from analyses
but if we already have a name (from a user or symbols) do nothing'''
if not self[address]['name']:
self[address]['name'] = name
def get_address_by_name(self, name):
if name in self.rnames:
return self.rnames[name]
else:
return None
def _insert_names(self,st):
'''TODO kind of fugly
takes in a string and replaces things like 0x???????? with
the name of that address, if it exists
doesn't make sense to be used externally...'''
st = str(st)
m = map(lambda x:int(x,16),re.findall(r"(?<=0x)[0-9a-f]+",st))
for val in m:
if self[val]['name']:
st = st.replace(hex(val),self[val]['name'])
return st
# keep the old tags interface
# names and function data no longer stored here
# things like xrefs can go here
# only write functional tags here
# comment -- comment on this address
# len -- number of bytes grouped with this one
# instruction -- string of this instruction
# type -- unset, 'instruction', 'data', 'string'
def get_tags(self, filt, addresses=None):
ret = {}
if addresses == None:
# all the addresses
addresses = self.tags.keys()
for a in addresses:
rret = {}
for f in filt:
t = self[a][f]
if t != None:
rret[f] = t
if rret != {}:
ret[a] = rret
return ret
def __setitem__(self, address, dat):
if type(address) is str:
self.global_tags[address] = dat
# for a single address
def __getitem__(self, address):
if type(address) is str:
if address in self.global_tags:
return self.global_tags[address]
else:
return None
if address not in self.tags:
self.tags[address] = Tags(self, address)
return self.tags[address]
# return the memory at address:ln
# replaces get_static_bytes
# TODO: refactor this!
def memory(self, address, ln):
dat = []
def ret():
if (sys.version_info > (3, 0)):
return bytes(dat)
else:
return ''.join(dat)
for i in range(ln):
ri = address+i
# hack for "RuntimeError: dictionary changed size during iteration"
for (ss, se) in self.base_memory.keys():
if ss <= ri and ri < se:
try:
dat.append(self.base_memory[(ss,se)][ri-ss])
break
except:
return ret()
return ret()
def add_memory_chunk(self, address, dat):
#print "add segment",hex(address),len(dat)
# check for dups
for (laddress, llength) in self.base_memory:
if address == laddress:
if self.base_memory[(laddress, llength)] != dat:
print("*** WARNING, changing segment",hex(laddress),llength)
return
# segments should have an idea of segment permission
self['segments'].append((address, len(dat)))
self.base_memory[(address, address+len(dat))] = dat
def process(self):
self.analyzer.analyze_functions(self)
if self.debug >= 1:
print("*** static found %d functions" % len(self['functions']))
# *** STATIC TEST STUFF ***
if __name__ == "__main__":
static = Static(sys.argv[1],debug=1)
print("arch:",static['arch'])
# find main
static.process()
"""
main = static.get_address_by_name("main")
print "main is at", main
recursive.make_function_at(static, static['entry'])
print "found %d functions" % len(static['functions'])
recursive.make_function_at(static, main)
print "found %d functions" % len(static['functions'])
"""
# function printer
for f in sorted(static['functions']):
print(static[f.start]['name'] or hex(f.start), f)
for b in sorted(f.blocks):
print(" ",b)
for a in sorted(b.addresses):
print(" ",hex(a),static._insert_names(static[a]['instruction']))
# print symbols
print("symbols")
names = static.get_tags(['name'])
for addr in names:
print("%8x: %s" % (addr, names[addr]['name']))
#print static['functions']
#print static[main]['instruction'], map(hex, static[main]['crefs'])
#print static.get_tags(['name'])
#bw_functions = byteweight.fsi(static)
#for f in bw_functions:
#print hex(f)
#hexdump(static.memory(f, 0x20))