1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| import time
from unicorn import * from unicorn.x86_const import * from capstone import *
CODE_ADDRESS = 0x74B000 STACK_ADDRESS = 0x0019C000 STACK_SIZE = 0x00080000 ORIG_SP = STACK_ADDRESS + STACK_SIZE // 2 JUMP_BLOCK_RANGE = range(CODE_ADDRESS + 0x3fd0, CODE_ADDRESS + 0x137f0)
shellcode = open("shellcode.bin", "rb").read()
mu = Uc(UC_ARCH_X86, UC_MODE_32)
mu.mem_map(CODE_ADDRESS & ~0xFFF, ((len(shellcode) + 0xFFF) & ~0xFFF) + 0x1000)
mu.mem_map(STACK_ADDRESS, STACK_SIZE)
mu.mem_write(CODE_ADDRESS, shellcode)
mu.reg_write(UC_X86_REG_ESP, STACK_ADDRESS + 0x0019F998 - 0x0019C000)
cs = Cs(CS_ARCH_X86, CS_MODE_32)
f = open("trace.txt", "w+")
class TraceInfo: def __init__(self, uc: unicorn.Uc, address, size): code = uc.mem_read(address, size) self.addr = address self.insn = next(cs.disasm(code, address)) self.regs = self.get_registers(uc) self.regs_change = "" self.mem_change = ""
def get_registers(self, uc: unicorn.Uc): return { 'eax': uc.reg_read(UC_X86_REG_EAX), 'ebx': uc.reg_read(UC_X86_REG_EBX), 'ecx': uc.reg_read(UC_X86_REG_ECX), 'edx': uc.reg_read(UC_X86_REG_EDX), 'esi': uc.reg_read(UC_X86_REG_ESI), 'edi': uc.reg_read(UC_X86_REG_EDI), 'esp': uc.reg_read(UC_X86_REG_ESP), 'ebp': uc.reg_read(UC_X86_REG_EBP), 'eflags': uc.reg_read(UC_X86_REG_EFLAGS),
}
def set_regs_change(self, uc: unicorn.Uc): new_regs = self.get_registers(uc) diffs = [] for reg, old_val in self.regs.items(): new_val = new_regs[reg] if old_val != new_val: diffs.append(f"{reg}: {hex(old_val)}-> {hex(new_val)}") self.regs_change = " ".join(diffs)
def set_mem_change(self, uc: unicorn.Uc, address, size, value): old_value = int.from_bytes(uc.mem_read(address, size), byteorder="little") self.mem_change = f"{hex(address)}: {hex(old_value)}-> {hex(value)}"
def get_state_trace_info(self): addr_str = f"{hex(self.addr)}".ljust(10) insn_str = f"{self.insn.mnemonic} {self.insn.op_str}".ljust(40) regs_str = self.regs_change.ljust(60) mem_str = self.mem_change.ljust(60) return f"{addr_str} | {insn_str} | {regs_str} | {mem_str}"
trace_info: TraceInfo = None
def hook_mem_write(uc, access, address, size, value, user_data): trace_info.set_mem_change(uc, address, size, value)
trace_count = 0 start_time = time.time()
def hook_code(uc, address, size, user_data): global trace_info, trace_count, start_time
trace_count += 1
current_time = time.time() elapsed_time = current_time - start_time if elapsed_time > 1: print( f"\rProcessed: {trace_count} instructions, Speed: {trace_count / elapsed_time:.2f} instructions/sec", end="", )
if trace_info is not None: trace_info.set_regs_change(uc) print(trace_info.get_state_trace_info(), file=f)
trace_info = TraceInfo(uc, address, size)
mu.hook_add(UC_HOOK_MEM_WRITE, hook_mem_write) mu.hook_add(UC_HOOK_CODE, hook_code)
try: print("Starting execution...") mu.emu_start(CODE_ADDRESS, CODE_ADDRESS + len(shellcode)) print("Execution finished.") except UcError as e: print(f"Unicorn execution failed: {e}") print(hex(mu.reg_read(UC_X86_REG_ESP))) print(hex(mu.reg_read(UC_X86_REG_EIP))) esp = mu.reg_read(UC_X86_REG_ESP) for i in range(esp, esp + 0x50, 4): print(hex(int.from_bytes(mu.mem_read(i, 4), byteorder="little")))
|