Source code
Revision control
Copy as Markdown
Other Tools
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import argparse
import hashlib
import os
import threading
import frida
ARGUMENTS = {}
SOCKETS = {}
TERMINATE = threading.Condition()
def store_for_target(target, data):
filename = hashlib.sha1(data).hexdigest()
directory = os.path.join(ARGUMENTS["output"], target)
os.makedirs(directory, exist_ok=True)
with open(os.path.join(directory, filename), "wb") as f:
f.write(data)
# --- certDN ---
def on_CERT_AsciiToName(payload):
if not "data" in payload:
return
store_for_target("certDN", payload["data"].encode())
# --- pkcs7 ---
def on_CERT_DecodeCertPackage(payload):
if not "data" in payload:
return
store_for_target("pkcs7", bytes(payload["data"].values()))
# --- pkcs8 ---
def on_PK11_ImportDERPrivateKeyInfoAndReturnKey(payload):
if not "data" in payload:
return
store_for_target("pkcs8", bytes(payload["data"].values()))
# --- pkcs12 ---
def on_SEC_PKCS12DecoderUpdate(payload):
if not "data" in payload:
return
store_for_target("pkcs12", bytes(payload["data"].values()))
# --- quickder ---
def on_SEC_QuickDERDecodeItem_Util(payload):
if not "data" in payload:
return
store_for_target("quickder", bytes(payload["data"].values()))
# --- TLS ---
def on_ssl_DefClose(payload):
ss = payload["ss"]
if not ss in SOCKETS:
return
# There is no way for us to determine (in a clean and future-proof)
# way the variant (DTLS/TLS) and origin (client/server) of the
# received data.
# Since you want to minimize the corpus anyway, we just say it belongs
# to all possible targets.
data = SOCKETS[ss].lstrip(b"\x00")
store_for_target("tls-client", data)
base_output_path = os.path.abspath(ARGUMENTS["output"])
for target in ["dtls-client", "dtls-server", "tls-client", "tls-server"]:
if not os.path.exists(os.path.join(base_output_path, target)):
os.symlink(os.path.join(base_output_path, "tls-client"),
os.path.join(base_output_path, target),
target_is_directory=True)
del SOCKETS[ss]
def on_ssl_DefRecv(payload):
if not "data" in payload:
return
ss = payload["ss"]
if not ss in SOCKETS:
SOCKETS[ss] = bytes()
SOCKETS[ss] += bytes(payload["data"].values())
def on_ssl_DefRead(payload):
if not "data" in payload:
return
ss = payload["ss"]
if not ss in SOCKETS:
SOCKETS[ss] = bytes()
SOCKETS[ss] += bytes(payload["data"].values())
def script_on_message(message, _data):
if message["type"] != "send":
print(message)
return
assert message["type"] == "send"
payload = message["payload"]
func = "on_" + payload["func"]
assert func in globals()
globals()[func](payload)
def session_on_detached():
with TERMINATE:
TERMINATE.notify_all()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--script", required=True, type=str)
parser.add_argument("--nss-build",
required=True,
type=str,
help="e.g. /path/to/dist/Debug")
parser.add_argument("--program", required=True, type=str)
parser.add_argument("--output", required=True, type=str)
args, programargs = parser.parse_known_args()
global ARGUMENTS
ARGUMENTS = vars(args)
with open(args.script, "r") as f:
script = f.read()
pid = frida.spawn(program=args.program,
argv=programargs,
env={
**os.environ, "LD_LIBRARY_PATH":
os.path.join(args.nss_build, "lib")
})
session = frida.attach(pid)
script = session.create_script(script)
script.load()
script.on("message", script_on_message)
session.resume()
frida.resume(pid)
session.on("detached", session_on_detached)
with TERMINATE:
TERMINATE.wait()
if __name__ == "__main__":
main()