dirty-ghidra / main.py
ejschwartz's picture
Improve dup location and fix duplicate binary bug.
f99e17c
raw
history blame
7.54 kB
import gradio as gr
import subprocess
import tempfile
import itertools
import os
import sys
import hashlib
import json
GHIDRA_PROJECT_DIR = f"{os.getenv('HOME')}/ghidra_project"
os.makedirs(GHIDRA_PROJECT_DIR, exist_ok=True)
def hash_file(file):
sha256_hash = hashlib.sha256()
with open(file, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def get_functions(file):
file_hash = hash_file(file)
with tempfile.TemporaryDirectory() as TEMP_DIR:
# First import the file
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -import {file} 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if o.returncode != 0:
if not "Found conflicting program file in project:" in o.stdout:
raise gr.Error(f"Unable to run Ghidra on {file}: {o.stdout}")
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -process -postscript /home/user/app/scripts/dump_functions.py {TEMP_DIR}/funcs.json 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if not os.path.exists(f"{TEMP_DIR}/funcs.json"):
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}")
json_funcs = json.load(open(f"{TEMP_DIR}/funcs.json"))
return json_funcs
with gr.Blocks() as demo:
state = gr.State()
intro = gr.Markdown(
"""
# DIRTY-Ghidra Inference Demo
Welcome! This is a demo of DIRTY-Ghidra, a tool that predict names and types for variables for Ghidra's decompiler.
To get started, upload a binary or select one of the example binaries below. Uploading a binary requires decompiling each function in the binary, which can take a few minutes.
## TODOs
* Make predictions for variables in non-unique storage locations
"""
)
file_widget = gr.File(label="Executable file")
with gr.Column(visible=False) as col:
# output = gr.Textbox("Output")
gr.Markdown(
"""
Great, you selected an executable! Now pick the function you would like
to analyze.
"""
)
fun_dropdown = gr.Dropdown(
label="Select a function", choices=["Woohoo!"], interactive=True
)
gr.Markdown(
"""
Below you can find some information.
"""
)
with gr.Row(visible=True) as result:
disassembly = gr.Code(
label="Disassembly", lines=20,
#min_width=400
)
original_decompile = gr.Code(
language="c",
label="Original Decompilation", lines=20,
#min_width=400
)
decompile = gr.Code(
language="c",
label="Renamed and retyped Decompilation",
lines=20,
#min_width=400
)
model_output = gr.JSON(
label="Model Output",
#min_width=400
)
# with gr.Column():
# clazz = gr.Label()
# interpret_button = gr.Button("Interpret (very slow)")
# interpretation = gr.components.Interpretation(disassembly)
example_widget = gr.Examples(
examples=[f.path for f in os.scandir(os.path.join(os.path.dirname(__file__), "examples"))],
inputs=file_widget,
outputs=[state, disassembly, original_decompile, decompile, model_output],
)
def file_change_fn(file):
if file is None:
return {col: gr.update(visible=False), state: {"file": None}}
else:
try:
progress = gr.Progress()
progress(
0,
desc=f"Analyzing binary {os.path.basename(file.name)} with Ghidra...",
)
fun_data = get_functions(file.name)
# print(fun_data)
addrs = [
(f"{name} ({hex(int(addr))}; {numvars} vars)", int(addr))
for addr, (name, cf, numvars) in fun_data.items()
]
cfs = {name: cf for (name, cf, _numvars) in fun_data.values()}
except Exception as e:
raise gr.Error(f"Unable to analyze binary with Ghidra: {e}")
return {
col: gr.Column(visible=True),
fun_dropdown: gr.Dropdown(choices=addrs, value=addrs[0][1]),
state: {"file": file,
"file_hash": hash_file(file.name),
"cfs": cfs},
}
def function_change_fn(selected_fun, state, progress=gr.Progress()):
# disassembly_str = fun_data[int(selected_fun, 16)].decode("utf-8")
# load_results = model.fn(disassembly_str)
# top_k = {e['label']: e['confidence'] for e in load_results['confidences']}
with tempfile.TemporaryDirectory() as TEMP_DIR:
progress(0, desc=f"Running DIRTY Ghidra on {hex(selected_fun)}...")
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {state['file_hash']} -process -postscript /DIRTY/scripts/DIRTY_infer.py {TEMP_DIR}/funcs.json {selected_fun} 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if o.returncode != 0:
raise gr.Error(f"Unable to run Ghidra: {o.stdout}")
if not os.path.exists(f"{TEMP_DIR}/funcs.json"):
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}")
try:
json_info = json.load(open(f"{TEMP_DIR}/funcs.json"))
except Exception as e:
raise gr.Error(f"Unable to parse DIRTY Ghidra output: {e}\n{o.stdout}")
if "exception" in json_info:
raise gr.Error(f"DIRTY Ghidra failed: {json_info['exception']}")
#print(json_info)
# group by location
src_filtered = json_info['other_info']['source_filtered']
keyfunc = lambda x: x[1]
src_filtered = sorted(src_filtered.items(), key=keyfunc)
src_filtered = {k: [v1 for v1, v2 in v] for k, v in itertools.groupby(src_filtered, keyfunc)}
model_output_info = {
'model_output': json_info["model_output"],
'dup_location_vars': src_filtered
}
return {
disassembly: gr.Textbox(value=json_info["disassembly"]),
original_decompile: gr.Textbox(value=json_info["original_decompile"]),
decompile: gr.Textbox(value=json_info["decompile"]),
model_output: gr.JSON(value=json.dumps(model_output_info)),
}
# Need to put intro as output to get progress to work!
file_widget.change(
file_change_fn, file_widget, outputs=[intro, state, col, fun_dropdown]
)
fun_dropdown.change(
function_change_fn,
inputs=[fun_dropdown, state],
outputs=[disassembly, original_decompile, decompile, model_output],
)
# spaces only shows stderr..
os.dup2(sys.stdout.fileno(), sys.stderr.fileno())
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)