Lots of Updates (expand for more):
- Started implementing new parser for chunked data - Started documenting data formats - Started dissector for network protocol - Added AI-Graph renderer (converts .pth files to python data you can import into Blender) - Added Script to convert savefile to JSON - Added (old) parser for chunked data format - Added basic parser for LFVF data section (Vertex Data) - Added script to analyze and filter read trace generated with frida script - Added various Frida scripts
This commit is contained in:
parent
aabacafd9c
commit
8d92f25b8c
47 changed files with 2744 additions and 411 deletions
68
tools/analyze_read_trace.py
Normal file
68
tools/analyze_read_trace.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import msgpack as mp
|
||||
import sys
|
||||
import os
|
||||
from tqdm import tqdm
|
||||
import struct
|
||||
import binascii
|
||||
import string
|
||||
import re
|
||||
from binascii import hexlify
|
||||
|
||||
def gen():
|
||||
with open(sys.argv[1], "rb") as fh:
|
||||
size = os.stat(sys.argv[1]).st_size
|
||||
progbar = tqdm(total=size, unit="bytes", unit_scale=True, unit_divisor=1024)
|
||||
pos = 0
|
||||
for entry in mp.Unpacker(fh, raw=True):
|
||||
progbar.update(fh.tell() - pos)
|
||||
pos = fh.tell()
|
||||
for k in entry.copy():
|
||||
k_s = str(k, "utf8")
|
||||
if k_s not in ["data", "stack", "timestamp"]:
|
||||
entry[k] = str(entry.pop(k), "utf8")
|
||||
entry[k_s] = entry.pop(k)
|
||||
entry["stack"] = "|".join(
|
||||
["{:08X}".format(int(str(v, "utf8"), 16)) for v in entry["stack"][::-1]]
|
||||
)
|
||||
yield entry
|
||||
|
||||
|
||||
|
||||
def strdump(data):
|
||||
printable_chars = set(bytes(string.printable, "ascii")) - set(b"\n\r\t\x0b\x0c")
|
||||
return "".join(chr(c) if c in printable_chars else "." for c in data)
|
||||
|
||||
def tohex(data):
|
||||
return str(hexlify(data), "utf8").upper()
|
||||
|
||||
|
||||
# best=sorted(tqdm(gen(),ascii=True),key=lambda v:len(v['data']),reverse=True)
|
||||
|
||||
# def score(entry):
|
||||
# return len(entry['data'])
|
||||
|
||||
# def analyze(entry):
|
||||
# data=entry['data']
|
||||
# entry['infos'] = {
|
||||
# 'len':len(data),
|
||||
# }
|
||||
# for bo in "><":
|
||||
# for t in "hHiIlLqQefd":
|
||||
# fmt="{}{}".format(bo,t)
|
||||
# if len(data)%struct.calcsize(fmt)==0:
|
||||
# entry['infos'][fmt]=[v[0] for v in struct.iter_unpack(fmt,data)]
|
||||
# return entry
|
||||
|
||||
filters=[re.compile(s) for s in sys.argv[2:]]
|
||||
|
||||
with open("all.log", "w") as of:
|
||||
for entry in gen():
|
||||
fm=[(f.match(entry['filename']) is not None) for f in filters]
|
||||
if filters and not any(fm):
|
||||
continue
|
||||
entry["data_len"] = len(entry["data"])
|
||||
entry["str"] = strdump(entry["data"])
|
||||
entry["data"] = tohex(entry["data"])
|
||||
print(
|
||||
"{timestamp} {block_id} {filename} {data_len:08X} {data} {str}".format(**entry), file=of
|
||||
)
|
57
tools/binvis.py
Normal file
57
tools/binvis.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
import struct
|
||||
from collections import OrderedDict, ChainMap
|
||||
|
||||
|
||||
class LittleEndian:
|
||||
byteorder = "<"
|
||||
|
||||
|
||||
class BigEndian:
|
||||
byteorder = ">"
|
||||
|
||||
|
||||
class NativeEndian:
|
||||
byteorder = "@"
|
||||
|
||||
|
||||
class Field:
|
||||
def __init__(self, struct_type=None, size=None, byteorder=None):
|
||||
self.struct = struct_type
|
||||
self.size = size
|
||||
self.byteorder = byteorder
|
||||
self.data = None
|
||||
self.parsed = False
|
||||
|
||||
def parse(self, data):
|
||||
return
|
||||
|
||||
|
||||
class ParserMeta(type):
|
||||
def __new__(cls, name, bases, namespace, **kwargs):
|
||||
if object in bases:
|
||||
return type.__new__(cls, name, bases, dict(namespace))
|
||||
fields = []
|
||||
for item_name, item_value in namespace.items():
|
||||
if isinstance(item_value, Field):
|
||||
fields.append(item_name)
|
||||
ret = super().__new__(cls, name, bases, namespace)
|
||||
ret._fields = fields
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def __prepare__(metacls, name, bases, **kwds):
|
||||
return OrderedDict()
|
||||
|
||||
|
||||
class Parser(metaclass=ParserMeta):
|
||||
def __init__(self, data):
|
||||
for field in self._fields:
|
||||
print(field, getattr(self, field))
|
||||
|
||||
|
||||
class ChunkedHeader(Parser, LittleEndian):
|
||||
size = Field("I")
|
||||
data = Field(size=size)
|
||||
|
||||
|
||||
print(ChunkedHeader(b""))
|
149
tools/dissect_net.py
Normal file
149
tools/dissect_net.py
Normal file
|
@ -0,0 +1,149 @@
|
|||
from construct import *
|
||||
from binascii import unhexlify
|
||||
from collections import defaultdict, Counter
|
||||
import string
|
||||
|
||||
|
||||
class CustomError(SymmetricAdapter):
|
||||
def __init__(self, msg):
|
||||
super(SymmetricAdapter, self).__init__(Pass)
|
||||
self._message = msg
|
||||
|
||||
def _decode(self, obj, context, path):
|
||||
# print("Error",path)
|
||||
# print(str(context))
|
||||
msg = self._message.format(ctx=context, obj=obj)
|
||||
raise ValidationError(message=msg, path=this.path)
|
||||
|
||||
|
||||
paket_type = Enum(
|
||||
Int8ub,
|
||||
GetGameInfo=0x7F01, # 0x7f3d ?
|
||||
Connect=0x7F47,
|
||||
GameInfo=0xBACE,
|
||||
LevelInfo=0x8017,
|
||||
Announce=0x4842,
|
||||
Disconnect=0x0F02,
|
||||
UpdatePlayerInfo=0xC49, # ???
|
||||
# UpdatePlayerInfo=0x8a4c,
|
||||
ChatIn=0x921E,
|
||||
ChatOut=0x0A1E,
|
||||
# Movement=0x802
|
||||
)
|
||||
|
||||
paket_subtype = Enum(
|
||||
Int8ub
|
||||
)
|
||||
|
||||
|
||||
packet_types = {
|
||||
"Movement": Struct("data" / GreedyBytes),
|
||||
"ChatIn": Struct(
|
||||
"unk" / Int16ub,
|
||||
"unk_2" / Int8ub,
|
||||
"msg" / PascalString(Int8ub, "utf-8"),
|
||||
"rest" / GreedyBytes,
|
||||
),
|
||||
"ChatOut": Struct(
|
||||
"unk" / Int16ub,
|
||||
"unk_2" / Int8ub,
|
||||
"msg" / PascalString(Int8ub, "utf-8"),
|
||||
"rest" / GreedyBytes,
|
||||
),
|
||||
"UpdatePlayerInfo": Struct(
|
||||
"data" / GreedyBytes
|
||||
# "name"/PascalString(Int32ub,"utf-8"),
|
||||
# "ship"/PascalString(Int8ub,"utf-8"),
|
||||
# "max_life"/Int8ub,
|
||||
# "player_char"/PascalString(Int16ub,"utf-8"),
|
||||
# "engines"/PascalString(Int8ub,"utf-8")[4],
|
||||
# "weapons"/PascalString(Int8ub,"utf-8"),
|
||||
# "team_id"/Int32ul
|
||||
),
|
||||
"Announce": "info" / CString("utf-8"),
|
||||
"GetGameInfo": Const(b"\x00\x00\x07"),
|
||||
"Disconnect": Const(b"\x00\x0c\x02"),
|
||||
"GameInfo": Struct(
|
||||
"version_minor" / Int8ul,
|
||||
"version_major" / Int8ul,
|
||||
"port" / Int16ul,
|
||||
"max_players" / Int16ul,
|
||||
"curr_players" / Int16ul,
|
||||
"name" / FixedSized(0x20, CString("utf-8")),
|
||||
"mode" / FixedSized(0x10, CString("utf-8")),
|
||||
"map" / Bytes(2),
|
||||
"rest" / GreedyBytes,
|
||||
),
|
||||
"Connect": Struct(
|
||||
"name" / PascalString(Int32ub, "utf-8"),
|
||||
"ship" / PascalString(Int8ub, "utf-8"),
|
||||
"max_life" / Int8ub,
|
||||
"player_char" / PascalString(Int16ub, "utf-8"),
|
||||
"engines" / PascalString(Int8ub, "utf-8")[4],
|
||||
"weapons" / PascalString(Int8ub, "utf-8"),
|
||||
"team_id" / Int32ul,
|
||||
),
|
||||
"LevelInfo": Struct(
|
||||
"path" / PascalString(Int32ub, "utf-8"),
|
||||
"mode" / PascalString(Int8ub, "utf-8"),
|
||||
"rest" / GreedyBytes,
|
||||
),
|
||||
}
|
||||
|
||||
default = "Unknown ID" / Struct("data" / GreedyBytes)
|
||||
# CustomError("Invalid ID: 0x{ctx.type:02x}")
|
||||
packet = Struct(
|
||||
"type" / Int8ub,
|
||||
"subtype"/ Int8ub
|
||||
# "data" / Switch(this.type, packet_types, default=default)
|
||||
)
|
||||
|
||||
|
||||
printable_chars = set(bytes(string.printable, "ascii")) - set(b"\n\r\t\x0b\x0c")
|
||||
|
||||
|
||||
def is_printable(s):
|
||||
return all(c in printable_chars for c in s.rstrip(b"\0"))
|
||||
|
||||
|
||||
def hexdump(data, cols=16, offset=0):
|
||||
lines = []
|
||||
while data:
|
||||
hexdata = " ".join("{:02X}".format(v) for v in data[:cols]).ljust(
|
||||
3 * cols - 1, " "
|
||||
)
|
||||
print_data = "".join(
|
||||
[chr(v) if v in printable_chars else "." for v in data[:cols]]
|
||||
)
|
||||
lines.append("{:04X} {} {}".format(offset, hexdata, print_data))
|
||||
offset += len(data[:cols])
|
||||
data = data[cols:]
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
def main():
|
||||
data_type = Counter()
|
||||
with open("netlog.txt", "r") as netlog:
|
||||
for line in netlog:
|
||||
direction, addr, buffer_addr, data = line.strip().split()
|
||||
data = unhexlify(data)
|
||||
print(direction, addr, buffer_addr)
|
||||
print(hexdump(data))
|
||||
print()
|
||||
try:
|
||||
parsed_data = packet.parse(data)
|
||||
data_type["{0} {1:08b}:{2:08b} ({1:02X}:{2:02X})".format(direction, parsed_data.type,parsed_data.subtype)] += len(data)
|
||||
except Exception:
|
||||
pass
|
||||
bar_width = 50
|
||||
label = "Data type (main:sub)"
|
||||
print("=" * 10, label, "=" * 10)
|
||||
max_v = max(data_type.values())
|
||||
total = sum(data_type.values())
|
||||
for k, v in sorted(data_type.items(), key=lambda v: v[1], reverse=True):
|
||||
bar = ("#" * round((v / max_v) * bar_width)).ljust(bar_width, " ")
|
||||
print(k, bar, "({}, {:.02%})".format(v, v / total))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
46
tools/packed.ksy
Normal file
46
tools/packed.ksy
Normal file
|
@ -0,0 +1,46 @@
|
|||
meta:
|
||||
id: packed
|
||||
application: Scrapland
|
||||
file-extension: packed
|
||||
endian: le
|
||||
xref: http://wiki.xentax.com/index.php/Scrapland_PACKED
|
||||
license: MIT
|
||||
encoding: latin1
|
||||
|
||||
seq:
|
||||
- id: magic
|
||||
contents: BFPK
|
||||
doc: File Magic
|
||||
- id: version
|
||||
type: u2
|
||||
size: 4
|
||||
doc: Second File Magic
|
||||
- id: num_files
|
||||
type: u4
|
||||
doc: Number of files
|
||||
- id: files
|
||||
type: file_entry
|
||||
repeat: expr
|
||||
repeat-expr: num_files
|
||||
doc: Directory entry for each file
|
||||
|
||||
types:
|
||||
file_entry:
|
||||
seq:
|
||||
- id: path_len
|
||||
type: u4
|
||||
doc: Length of file path
|
||||
- id: path
|
||||
type: str
|
||||
size: path_len
|
||||
doc: File path
|
||||
- id: size
|
||||
type: u4
|
||||
doc: File size
|
||||
- id: offset
|
||||
type: u4
|
||||
doc: Absoulte File offset
|
||||
instances:
|
||||
data:
|
||||
pos: offset
|
||||
size: size
|
122
tools/parse_LFVF.py
Normal file
122
tools/parse_LFVF.py
Normal file
|
@ -0,0 +1,122 @@
|
|||
import os
|
||||
import json
|
||||
from construct import *
|
||||
|
||||
blocksize = 1024 * 4
|
||||
|
||||
|
||||
def search(pattern, path):
|
||||
seen = set()
|
||||
with open(path, "rb") as infile:
|
||||
buffer = bytearray(infile.read(blocksize))
|
||||
while infile.peek(1):
|
||||
for block in iter(lambda: infile.read(blocksize), b""):
|
||||
buffer += block
|
||||
buffer = buffer[-(blocksize * 2) :]
|
||||
idx = buffer.find(pattern)
|
||||
if idx != -1:
|
||||
pos = (infile.tell() - blocksize * 2) + idx
|
||||
if pos not in seen:
|
||||
seen.add(pos)
|
||||
return sorted(seen)
|
||||
|
||||
|
||||
has_pos = [
|
||||
"D3DFVF_XYZ",
|
||||
"D3DFVF_XYZRHW",
|
||||
]
|
||||
|
||||
num_blend = {
|
||||
'D3DFVF_XYZB1': 1,
|
||||
'D3DFVF_XYZB2': 2,
|
||||
'D3DFVF_XYZB3': 3,
|
||||
'D3DFVF_XYZB4': 4,
|
||||
}
|
||||
|
||||
Vertex = Struct(
|
||||
"pos" / If(lambda ctx: ctx._._.fvf.position in has_pos, Float32l[3]),
|
||||
"rhw" / If(lambda ctx: ctx._._.fvf.position == "D3DFVF_XYZRHW", Float32l),
|
||||
"w_blend" / If(lambda ctx: num_blend.get(ctx._._.fvf.position,0)!=0, Int32ul),
|
||||
"normal" / If(lambda ctx: ctx._._.fvf.flags.D3DFVF_NORMAL, Float32l[3]),
|
||||
"diffuse" / If(lambda ctx: ctx._._.fvf.flags.D3DFVF_DIFFUSE, Int8ul[4]),
|
||||
"specular" / If(lambda ctx: ctx._._.fvf.flags.D3DFVF_SPECULAR, Int8ul[4]),
|
||||
"tex" / Float32l[this.num_tex_coords][this._._.fvf.num_tex],
|
||||
)
|
||||
|
||||
D3DFVF_POSITION_MASK = 0xE
|
||||
D3DFVF_TEXCOUNT_MASK = 0xF00
|
||||
D3DFVF_TEXCOUNT_SHIFT = 8
|
||||
|
||||
FVF = "fvf" / Union(
|
||||
0,
|
||||
"value" / Int32ul,
|
||||
"num_tex"
|
||||
/ Computed(
|
||||
lambda ctx: 1 + ((ctx.value & D3DFVF_TEXCOUNT_MASK) >> D3DFVF_TEXCOUNT_MASK)
|
||||
),
|
||||
"position"
|
||||
/ Enum(
|
||||
Computed(lambda ctx: (ctx.value & D3DFVF_POSITION_MASK)),
|
||||
D3DFVF_XYZ=0x2,
|
||||
D3DFVF_XYZRHW=0x4,
|
||||
D3DFVF_XYZB1=0x6,
|
||||
D3DFVF_XYZB2=0x8,
|
||||
D3DFVF_XYZB3=0xA,
|
||||
D3DFVF_XYZB4=0xC,
|
||||
),
|
||||
"flags"
|
||||
/ FlagsEnum(
|
||||
Int32ul,
|
||||
D3DFVF_RESERVED0=0x1,
|
||||
D3DFVF_NORMAL=0x10,
|
||||
D3DFVF_PSIZE=0x20,
|
||||
D3DFVF_DIFFUSE=0x40,
|
||||
D3DFVF_SPECULAR=0x80,
|
||||
),
|
||||
)
|
||||
|
||||
LFVF_Data = Struct(
|
||||
"unk" / Int32ul,
|
||||
"num_entries"/Int32ul,
|
||||
"data"/Struct(
|
||||
FVF,
|
||||
"unk_size" / Int32ul,
|
||||
"vertices" / PrefixedArray(Int32ul, Vertex),
|
||||
)
|
||||
# Terminated,
|
||||
)
|
||||
|
||||
LFVF = Struct(
|
||||
Const(b"LFVF"), "size" / Int32ul, "data" / RestreamData(Bytes(this.size), LFVF_Data)
|
||||
)
|
||||
|
||||
files = [
|
||||
r"D:\Games\Deep Silver\Scrapland\extracted\Data.packed\models\skies\orbit\sky.sm3",
|
||||
r"D:\Games\Deep Silver\Scrapland\extracted\Data.packed\models\chars\boss\boss.sm3",
|
||||
r"D:\Games\Deep Silver\Scrapland\extracted\Data.packed\models\chars\dtritus\dtritus.sm3",
|
||||
r"D:\Games\Deep Silver\Scrapland\extracted\Data.packed\levels\gdb\map\map3d.emi"
|
||||
]
|
||||
|
||||
vert_pos = {}
|
||||
|
||||
for path in files:
|
||||
name = os.path.split(path)[-1]
|
||||
fh = open(path, "rb")
|
||||
offsets = search(b"LFVF", path)
|
||||
for offset in sorted(offsets):
|
||||
fh.seek(offset)
|
||||
print("Offset:", offset)
|
||||
s = LFVF.parse_stream(fh)
|
||||
print(s)
|
||||
print("=" * 10)
|
||||
continue
|
||||
# # print(s)
|
||||
# print(path, fh.tell(), list(s.unk_ints), list(s.data.unk), fh.read(8))
|
||||
# s = s.data
|
||||
# vpos = [
|
||||
# tuple(p for p in v.pos) for v in s.vertices
|
||||
# ] # leave vertices alone because we don't need to reproject shit :|
|
||||
# vert_pos["{}@{}".format(name, hex(offset))] = vpos
|
||||
# with open("LFVF_Data.json", "w") as of:
|
||||
# json.dump(vert_pos, of)
|
||||
# break
|
117
tools/parse_chunked.py
Normal file
117
tools/parse_chunked.py
Normal file
|
@ -0,0 +1,117 @@
|
|||
from construct import *
|
||||
import binascii
|
||||
import os
|
||||
|
||||
Chunked = LazyBound(lambda: struct)
|
||||
|
||||
|
||||
class CustomError(SymmetricAdapter):
|
||||
def __init__(self, msg):
|
||||
super(SymmetricAdapter, self).__init__(Pass)
|
||||
self._message = msg
|
||||
|
||||
def _decode(self, obj, context, path):
|
||||
# print("Error",path)
|
||||
# print(str(context))
|
||||
msg = "Invalid ID: " + repr(context.id)
|
||||
raise ValidationError(message=msg, path=this.path)
|
||||
|
||||
|
||||
RGB = NamedTuple("RGB", "R G B", Int8ul[3])
|
||||
|
||||
RGBA = NamedTuple("RGBA", "R G B A", Int8ul[4])
|
||||
|
||||
|
||||
def make_chain(*sizes):
|
||||
"utility function to make sequence of byte arrays"
|
||||
return Sequence(*[Bytes(s) for s in sizes])
|
||||
|
||||
|
||||
child_nodes = "children" / Struct("num" / Int32ul, "nodes" / Chunked[this.num])
|
||||
|
||||
subchunks = {
|
||||
b"SM3\0": Struct(
|
||||
"unk" / Bytes(4),
|
||||
"timestamp" / Timestamp(Int32ul, 1, 1970),
|
||||
child_nodes,
|
||||
"scene" / Chunked,
|
||||
),
|
||||
b"SCN\0": Struct(
|
||||
"version" / Int32ul,
|
||||
"m3d_name" / PascalString(Int32ul, "utf8"),
|
||||
"name" / PascalString(Int32ul, "utf8"),
|
||||
child_nodes,
|
||||
),
|
||||
b"INI\0": Struct(
|
||||
"data"
|
||||
/ PrefixedArray(Int32ul, PrefixedArray(Int32ul, PascalString(Int32ul, "utf8"))),
|
||||
"colors?" / Sequence(Int8ul, Int8ul, Int8ul, Int8ul, Float32l)[2],
|
||||
"unk_data" / Bytes(0x18),
|
||||
"unk_float" / Float32l,
|
||||
"unk_int" / Int32ul,
|
||||
child_nodes,
|
||||
),
|
||||
b"EMI\0": Struct(
|
||||
"version"/Int32ul,
|
||||
"num_materials"/Int32ul,
|
||||
"num_unk"/Int32ul,
|
||||
"materials"/Chunked
|
||||
),
|
||||
|
||||
b"MAT\0": Struct(
|
||||
"tris"/Int32ul,
|
||||
"name"/PascalString(Int32ul,"utf8"),
|
||||
"idx"/Bytes(this.tris*4*4)
|
||||
),
|
||||
|
||||
None: Bytes(lambda ctx:ctx.size),
|
||||
}
|
||||
|
||||
struct = Struct(
|
||||
"id" / Bytes(4),
|
||||
"size" / Int32ul,
|
||||
"data" / Switch(this.id, subchunks, default=subchunks[None]),
|
||||
)
|
||||
|
||||
|
||||
def io_peek(fh, n):
|
||||
p = fh.tell()
|
||||
ret = fh.read(n)
|
||||
fh.seek(p)
|
||||
return ret
|
||||
|
||||
|
||||
basedir = r"D:/Games/Deep Silver/Scrapland/extracted/Data.packed"
|
||||
|
||||
files = [
|
||||
r"Models/Elements/AnilloEstructuraA/AnilloEstructuraA.SM3",
|
||||
r"models/elements/antenaa/antenaa.lod1.sm3",
|
||||
r"models/elements/abshield/anm/loop.cm3",
|
||||
r"levels/fake/map/map3d.amc",
|
||||
r"levels/shipedit/map/map3d.dum",
|
||||
r"levels/menu/map/map3d.emi",
|
||||
r"Models/Skies/Menu/Sky.SM3",
|
||||
r"Levels/Menu/Map/Map3D.SM3",
|
||||
r"Models/Elements/AnilloEstructuraD/AnilloEstructuraD.LOD1.SM3",
|
||||
r"levels/menu/map/map3d.amc",
|
||||
r"levels/menu/map/map3d.dum",
|
||||
r"levels/menu/map/scenecamera/anm/loop.cm3",
|
||||
r"models/chars/boss/boss.sm3",
|
||||
r"models/chars/boss/anm/boss_walk.cm3",
|
||||
]
|
||||
for file in files:
|
||||
file = os.path.join(basedir, file).replace("/","\\")
|
||||
print()
|
||||
print("#" * 3, file)
|
||||
with open(file, "rb") as infile:
|
||||
try:
|
||||
data = struct.parse_stream(infile)
|
||||
# assert infile.read()==b"","leftover data"
|
||||
except Exception as ex:
|
||||
print("Error:", ex)
|
||||
data = None
|
||||
if data:
|
||||
print(data)
|
||||
print("OFFSET:", hex(infile.tell()))
|
||||
print("NEXT:", io_peek(infile, 16))
|
||||
print("NEXT:", binascii.hexlify(io_peek(infile, 16)))
|
255
tools/parse_chunked_new.py
Normal file
255
tools/parse_chunked_new.py
Normal file
|
@ -0,0 +1,255 @@
|
|||
import os
|
||||
import sys
|
||||
import struct
|
||||
import string
|
||||
from pprint import pprint
|
||||
from io import BytesIO
|
||||
from contextlib import contextmanager
|
||||
from datetime import timedelta, datetime
|
||||
import glob
|
||||
|
||||
printable_chars = set(bytes(string.printable, "ascii")) - set(b"\n\r\t\x0b\x0c")
|
||||
|
||||
|
||||
def hexdump(data, cols=16, offset=0, markers=None):
|
||||
if markers is None:
|
||||
markers = []
|
||||
lines = []
|
||||
while True:
|
||||
hexdata = " ".join("{:02X}".format(v) for v in data[:cols]).ljust(
|
||||
3 * cols - 1, " "
|
||||
)
|
||||
print_data = "".join(
|
||||
[chr(v) if v in printable_chars else "." for v in data[:cols]]
|
||||
)
|
||||
lines.append("{:04X} {} {}".format(offset, hexdata, print_data))
|
||||
offset += len(data[:cols])
|
||||
data = data[cols:]
|
||||
if not data:
|
||||
break
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def seek_to(fh, offset, pos=None):
|
||||
if pos is None:
|
||||
pos = fh.tell()
|
||||
fh.seek(offset)
|
||||
yield
|
||||
fh.seek(pos)
|
||||
|
||||
def read_array(s,fh):
|
||||
ret=[]
|
||||
count = read_struct("<I", fh)[0]
|
||||
size = struct.calcsize(s)
|
||||
for _ in range(count):
|
||||
ret.append(read_struct(s,fh))
|
||||
return ret
|
||||
|
||||
|
||||
def read_struct(s, fh):
|
||||
size = struct.calcsize(s)
|
||||
return struct.unpack(s, fh.read(size))
|
||||
|
||||
|
||||
def read_str(fh):
|
||||
size = read_struct("<I", fh)[0]
|
||||
return fh.read(size)
|
||||
|
||||
|
||||
def read_block(fh):
|
||||
try:
|
||||
pos = fh.tell()
|
||||
magic = str(fh.read(4).rstrip(b"\x00"), "utf8")
|
||||
size = read_struct("<I", fh)[0]
|
||||
data = fh.read(size)
|
||||
return magic, data
|
||||
except struct.error:
|
||||
fh.seek(pos)
|
||||
return
|
||||
|
||||
|
||||
vals = set()
|
||||
|
||||
# ================================
|
||||
class Parser:
|
||||
depth = 0
|
||||
dump_size = 0x100
|
||||
|
||||
def __init__(self, debug=False):
|
||||
self.debug = debug
|
||||
|
||||
def _default(self, magic, fh):
|
||||
print("=====", magic, "=====")
|
||||
if self.debug:
|
||||
print(hexdump(fh.read(self.dump_size)))
|
||||
rest = len(fh.read())
|
||||
if rest:
|
||||
print("<{} more bytes>".format(rest))
|
||||
fh.seek(0)
|
||||
return "<Unparsed {} ({} bytes)>".format(magic, len(fh.read()))
|
||||
|
||||
def parse(self, magic, data, depth=0):
|
||||
print("{}[{}] {} bytes".format(" " * self.depth, magic, len(data)))
|
||||
self.depth += 1
|
||||
fh = BytesIO(data)
|
||||
ret = getattr(self, magic, lambda fh: self._default(magic, fh))(fh)
|
||||
pos = fh.tell()
|
||||
leftover = len(fh.read())
|
||||
fh.seek(pos)
|
||||
self.depth -= 1
|
||||
if leftover:
|
||||
print("{}[{}] {} bytes unparsed".format(" " * self.depth, magic, leftover))
|
||||
if self.debug:
|
||||
print(hexdump(fh.read(self.dump_size)))
|
||||
rest = len(fh.read())
|
||||
if rest:
|
||||
print("<{} more bytes>".format(rest))
|
||||
print("-" * 50)
|
||||
return ret
|
||||
|
||||
def parse_block(self, fh):
|
||||
block = read_block(fh)
|
||||
if block:
|
||||
return self.parse(*block)
|
||||
|
||||
# Block definitions
|
||||
|
||||
def SM3(self, fh):
|
||||
ret = {}
|
||||
ret["unk_1"] = fh.read(4) # always F8156500
|
||||
ret["timestamp_2"] = datetime.fromtimestamp(read_struct("<I", fh)[0])
|
||||
ret["unk_2"] = fh.read(4) # always 00000000
|
||||
ret["scene"] = self.parse_block(fh)
|
||||
assert fh.read() == b"", "Leftover Data"
|
||||
return ret
|
||||
|
||||
def SCN(self, fh):
|
||||
ret = {}
|
||||
ret["unk_1"] = read_struct("<I", fh)[0]
|
||||
ret["model_name"] = read_str(fh)
|
||||
ret["node_name"] = read_str(fh)
|
||||
if read_struct("<I", fh)[0]:
|
||||
ret["ini_1"] = self.parse_block(fh)
|
||||
ret["unk_c_1"] = read_struct("<BBBB", fh)
|
||||
ret["unk_f_1"] = read_struct("<f", fh)[0]
|
||||
ret["unk_c_2"] = read_struct("<BBBB", fh)
|
||||
ret["unk_f_l"] = read_struct("<ffffffff", fh)
|
||||
if read_struct("<I", fh)[0]:
|
||||
ret["ini_2"] = self.parse_block(fh)
|
||||
ret["num_mat"] = read_struct("<I", fh)[0]
|
||||
ret["mat"] = []
|
||||
for _ in range(ret["num_mat"]):
|
||||
ret["mat"].append(self.parse_block(fh))
|
||||
# ret["children"] = []
|
||||
# for _ in range(read_struct("<I", fh)[0]):
|
||||
# ret["children"].append(self.parse_block(fh))
|
||||
# ret["unk_2"] = []
|
||||
# for _ in range(4):
|
||||
# ret["unk_2"].append(read_struct("<fff", fh))
|
||||
# ret["materials"] = []
|
||||
# for _ in range(read_struct("<I", fh)[0]):
|
||||
# ret["materials"].append(self.parse_block(fh))
|
||||
return ret
|
||||
|
||||
def INI(self, fh):
|
||||
num_sections = read_struct("<I", fh)[0]
|
||||
sections = []
|
||||
for _ in range(num_sections):
|
||||
num_lines = read_struct("<I", fh)[0]
|
||||
lines = []
|
||||
for _ in range(num_lines):
|
||||
lines.append(str(read_str(fh).rstrip(b"\0"), "latin1"))
|
||||
sections.append("\n".join(lines))
|
||||
lines.clear()
|
||||
assert fh.read() == b"", "Leftover Data"
|
||||
return sections
|
||||
|
||||
def MAT(self, fh):
|
||||
# ret = {}
|
||||
# ret["unk_1"] = read_struct("<I", fh)[0]
|
||||
# ret["name"] = read_str(fh)
|
||||
# ret["colors?"] = ["{:08X}".format(v) for v in read_struct(">7I", fh)]
|
||||
# ret["maps"]=[]
|
||||
# for _ in range(ret["num_maps"]):
|
||||
# ret["maps"].append(self.parse_block(fh))
|
||||
return {"maps": fh.read().count(b"MAP\0")}
|
||||
|
||||
def MAP(self, fh):
|
||||
ret = {}
|
||||
ret["unk_1"] = read_struct("<I", fh)[0]
|
||||
ret["name"] = read_str(fh)
|
||||
ret["unk_2"] = read_struct("<IIII", fh)
|
||||
ret["unk_3"] = read_struct("<fff", fh)
|
||||
ret["unk_4"] = read_struct("<II", fh)
|
||||
ret["rest"] = fh.read()
|
||||
return ret
|
||||
|
||||
# def CM3(self, fh):
|
||||
# return len(fh.read())
|
||||
|
||||
def DUM(self, fh):
|
||||
ret = {}
|
||||
ret["unk_1"] = read_struct("<I", fh)
|
||||
ret["num_dummies"] = read_struct("<I", fh)[0]
|
||||
ret["unk_2"] = read_struct("<I", fh)
|
||||
ret["dummies"] = []
|
||||
for _ in range(ret["num_dummies"]):
|
||||
dum = {}
|
||||
dum["name"] = read_str(fh)
|
||||
dum["pos"] = read_struct("<fff", fh)
|
||||
dum["rot"] = read_struct("<fff", fh)
|
||||
dum["has_ini"] = read_struct("<I", fh)[0]
|
||||
if dum["has_ini"]:
|
||||
dum['ini']=self.parse_block(fh)
|
||||
dum["has_next"] = read_struct("<I", fh)[0]
|
||||
ret["dummies"].append(dum)
|
||||
assert fh.read() == b"", "Leftover Data"
|
||||
return ret
|
||||
|
||||
# def AMC(self, fh):
|
||||
# return len(fh.read())
|
||||
|
||||
# def EMI(self, fh):
|
||||
# return len(fh.read())
|
||||
|
||||
|
||||
# ================================
|
||||
|
||||
basedir = r"D:/Games/Deep Silver/Scrapland/extracted/Data.packed"
|
||||
|
||||
files = [
|
||||
r"Models/Chars/Dtritus/Dtritus.sm3",
|
||||
r"Models/Elements/AnilloEstructuraA/AnilloEstructuraA.SM3",
|
||||
r"models/elements/antenaa/antenaa.lod1.sm3",
|
||||
# r"models/elements/abshield/anm/loop.cm3",
|
||||
# r"levels/fake/map/map3d.amc",
|
||||
# r"levels/shipedit/map/map3d.dum",
|
||||
# r"levels/menu/map/map3d.emi",
|
||||
r"Models/Skies/Menu/Sky.SM3",
|
||||
r"Levels/Menu/Map/Map3D.SM3",
|
||||
r"Models/Elements/AnilloEstructuraD/AnilloEstructuraD.LOD1.SM3",
|
||||
# r"levels/menu/map/map3d.amc",
|
||||
# r"levels/menu/map/map3d.dum",
|
||||
# r"levels/menu/map/scenecamera/anm/loop.cm3",
|
||||
r"models/chars/boss/boss.sm3",
|
||||
# r"models/chars/boss/anm/boss_walk.cm3",
|
||||
]
|
||||
|
||||
filt = [s.lower() for s in sys.argv[1:]]
|
||||
|
||||
for root, folders, files in os.walk(basedir):
|
||||
for file in files:
|
||||
path = os.path.join(root, file).replace("\\","/")
|
||||
if not path.lower().endswith(".dum".lower()):
|
||||
continue
|
||||
print("Parsing", path)
|
||||
p = Parser(debug=True)
|
||||
with open(path, "rb") as fh:
|
||||
while True:
|
||||
parsed = p.parse_block(fh)
|
||||
if not parsed:
|
||||
break
|
||||
pprint(parsed, compact=False, indent=4)
|
||||
print("#" * 50)
|
||||
|
31
tools/rbingrep.py
Normal file
31
tools/rbingrep.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
import binascii
|
||||
import os
|
||||
import sys
|
||||
|
||||
exe_file = os.path.abspath(sys.argv[1])
|
||||
|
||||
def search(pattern, path):
|
||||
seen = set()
|
||||
with open(path, "rb") as infile:
|
||||
buffer = bytearray(infile.read(blocksize))
|
||||
while infile.peek(1):
|
||||
for block in iter(lambda: infile.read(blocksize), b""):
|
||||
buffer += block
|
||||
buffer = buffer[-(blocksize * 2) :]
|
||||
idx = buffer.find(pattern)
|
||||
if idx != -1:
|
||||
pos = (infile.tell() - blocksize * 2) + idx
|
||||
if pos not in seen:
|
||||
seen.add(pos)
|
||||
return sorted(seen)
|
||||
|
||||
|
||||
markers = [ "AMC", "ANI", "CAM", "CM3", "CMSH", "DUM", "EMI", "EVA", "INI", "LFVF", "LUZ", "MAP", "MAT", "MD3D", "NAE", "NAM", "PORT", "QUAD", "SCN", "SM3", "SUEL", "TRI", ]
|
||||
|
||||
blocksize = 1024 * 4
|
||||
for marker in markers:
|
||||
pattern = bytes(marker, "utf8").ljust(4, b"\0")
|
||||
res = search(pattern, exe_file)
|
||||
print("?e "+marker)
|
||||
for addr in res:
|
||||
print("/r `?P {}`".format(hex(addr)))
|
50
tools/render_ai_path.py
Normal file
50
tools/render_ai_path.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
from construct import *
|
||||
|
||||
AI_PATH = "Path" / Struct(
|
||||
"num_nodes" / Int32ul,
|
||||
"nodes" / Float32l[3][this.num_nodes],
|
||||
"edges" / PrefixedArray(Int32ul, Float32l[3])[this.num_nodes],
|
||||
)
|
||||
|
||||
data = AI_PATH.parse_file(sys.argv[1])
|
||||
|
||||
nodes = [tuple(node) for node in data.nodes]
|
||||
edges = [[nodes.index(tuple(p)) for p in edge] for edge in data.edges]
|
||||
|
||||
# Run in Blender:
|
||||
"""
|
||||
import bpy
|
||||
import numpy as np
|
||||
import itertools as ITT
|
||||
|
||||
nodes = <paste_nodes>
|
||||
edges_=<paste_edges>
|
||||
|
||||
# pasted node and edges here
|
||||
|
||||
edges=[]
|
||||
|
||||
for edge in edges_:
|
||||
for a,b in zip(edge,edge[1:]):
|
||||
edges.append(a)
|
||||
edges.append(b)
|
||||
|
||||
nodes=[[p*0.0001 for p in node] for node in nodes]
|
||||
|
||||
me = bpy.data.meshes.new("Test")
|
||||
|
||||
nodes = np.array(list(ITT.chain.from_iterable(nodes)))
|
||||
|
||||
me.vertices.add(len(nodes)//3)
|
||||
me.vertices.foreach_set("co", nodes)
|
||||
me.edges.add(len(edges)//2)
|
||||
me.edges.foreach_set("vertices", np.array(edges))
|
||||
|
||||
me.update(calc_edges=True)
|
||||
me.validate()
|
||||
|
||||
ob = bpy.data.objects.new("Test", me)
|
||||
|
||||
scene = bpy.context.scene
|
||||
scene.collection.objects.link(ob)
|
||||
"""
|
27
tools/save_to_json.py
Normal file
27
tools/save_to_json.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
import sys
|
||||
import os
|
||||
from construct import *
|
||||
import json
|
||||
|
||||
save_data = {}
|
||||
|
||||
ScrapSaveVar = Struct(
|
||||
"name" / PascalString(Int32ul, encoding="windows-1252"),
|
||||
"data" / PascalString(Int32ul, encoding="windows-1252"),
|
||||
)
|
||||
ScrapSave = "ScarpSaveGame" / Struct(
|
||||
"title" / PascalString(Int32ul, encoding="windows-1252"),
|
||||
"id" / PascalString(Int32ul, encoding="windows-1252"),
|
||||
"data" / PrefixedArray(Int32ul, ScrapSaveVar),
|
||||
Terminated,
|
||||
)
|
||||
|
||||
with open(sys.argv[1], "rb") as sav_file:
|
||||
save = ScrapSave.parse_stream(sav_file)
|
||||
save_data["id"] = save.id
|
||||
save_data["title"] = save.title
|
||||
save_data["data"] = {}
|
||||
for var in save.data:
|
||||
save_data["data"][var.name] = var.data
|
||||
with open(os.path.basename(sys.argv[1]) + ".json", "w") as of:
|
||||
json.dump(save_data, of, indent=4)
|
143
tools/scrapper.py
Normal file
143
tools/scrapper.py
Normal file
|
@ -0,0 +1,143 @@
|
|||
import argparse
|
||||
from collections import OrderedDict
|
||||
import glob
|
||||
import os
|
||||
import shutil
|
||||
from construct import (
|
||||
Struct,
|
||||
PascalString,
|
||||
Int32ul,
|
||||
Lazy,
|
||||
Pointer,
|
||||
Bytes,
|
||||
this,
|
||||
PrefixedArray,
|
||||
Const,
|
||||
Debugger
|
||||
)
|
||||
from tqdm import tqdm
|
||||
|
||||
ScrapFile = Struct(
|
||||
"path" / PascalString(Int32ul, encoding="ascii"),
|
||||
"size" / Int32ul,
|
||||
"offset" / Int32ul,
|
||||
"data" / Lazy(Pointer(this.offset, Bytes(this.size))),
|
||||
)
|
||||
DummyFile = Struct(
|
||||
"path" / PascalString(Int32ul, encoding="u8"), "size" / Int32ul, "offset" / Int32ul
|
||||
)
|
||||
|
||||
PackedHeader = Struct(
|
||||
Const(b"BFPK"), Const(b"\0\0\0\0"), "files" / PrefixedArray(Int32ul, ScrapFile)
|
||||
)
|
||||
DummyHeader = Struct(
|
||||
Const(b"BFPK"), Const(b"\0\0\0\0"), "files" / PrefixedArray(Int32ul, DummyFile)
|
||||
)
|
||||
parser = argparse.ArgumentParser(description="Unpack and Repack .packed files")
|
||||
parser.add_argument(
|
||||
"-u", "--unpack", action="store_true", help="unpack file to 'extracted' directory"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-r", "--repack", action="store_true", help="repack file from 'extracted' directory"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--reset", action="store_true", default=False, help="restore backup"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"scrap_dir",
|
||||
metavar="Scrapland Directory",
|
||||
type=str,
|
||||
default=".",
|
||||
help="Scrapland installation directory",
|
||||
)
|
||||
options = parser.parse_args()
|
||||
scrap_dir = os.path.abspath(options.scrap_dir)
|
||||
|
||||
if options.reset:
|
||||
print("Restoring Backups and removing extracted folder...")
|
||||
for packed_file in glob.glob(os.path.join(scrap_dir, "*.packed.bak")):
|
||||
outfile = os.path.basename(packed_file)
|
||||
orig_filename = outfile[:-4]
|
||||
if os.path.isfile(outfile):
|
||||
print("deleting", orig_filename)
|
||||
os.remove(orig_filename)
|
||||
print("moving", outfile, "->", orig_filename)
|
||||
shutil.move(outfile, orig_filename)
|
||||
target_folder = os.path.join("extracted", os.path.basename(orig_filename))
|
||||
print("deleting", target_folder)
|
||||
shutil.rmtree(target_folder)
|
||||
if os.path.isdir("extracted"):
|
||||
input("Press enter to remove rest of extracted folder")
|
||||
shutil.rmtree("extracted")
|
||||
exit("Done!")
|
||||
|
||||
if not (options.unpack or options.repack):
|
||||
parser.print_help()
|
||||
exit()
|
||||
pstatus = ""
|
||||
if options.unpack:
|
||||
if os.path.isdir("extracted"):
|
||||
print("Removing extracted folder")
|
||||
shutil.rmtree("extracted")
|
||||
for packed_file in glob.glob(os.path.join(scrap_dir, "*.packed")):
|
||||
os.chdir(scrap_dir)
|
||||
BN = os.path.basename(packed_file)
|
||||
target_folder = os.path.join("extracted", os.path.basename(packed_file))
|
||||
os.makedirs(target_folder, exist_ok=True)
|
||||
os.chdir(target_folder)
|
||||
print("Unpacking {}".format(os.path.basename(packed_file)))
|
||||
with open(packed_file, "rb") as pkfile:
|
||||
data = PackedHeader.parse_stream(pkfile)
|
||||
print("Offset:", hex(pkfile.tell()))
|
||||
for file in tqdm(data.files, ascii=True):
|
||||
folder, filename = os.path.split(file.path)
|
||||
if folder:
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
with open(file.path, "wb") as outfile:
|
||||
outfile.write(file.data())
|
||||
print("\r" + " " * len(pstatus) + "\r", end="", flush=True)
|
||||
os.chdir(scrap_dir)
|
||||
|
||||
if options.unpack and options.repack:
|
||||
input(
|
||||
"Press enter to rebuild *.packed files from folders in 'extracted' dir..."
|
||||
) # noqa
|
||||
pass
|
||||
|
||||
|
||||
def file_gen(files, offset=0):
|
||||
for real_path, size, path in files:
|
||||
file = dict(path=path, offset=offset, size=size)
|
||||
yield file
|
||||
offset += file["size"]
|
||||
|
||||
|
||||
def make_header(files, offset=0):
|
||||
files_list = list(file_gen(files, offset))
|
||||
return DummyHeader.build(dict(files=files_list))
|
||||
|
||||
|
||||
if options.repack:
|
||||
for folder in glob.glob(os.path.join(scrap_dir, "extracted", "*.packed")):
|
||||
data = []
|
||||
filename = os.path.join(scrap_dir, os.path.basename(folder))
|
||||
for root, folders, files in os.walk(folder):
|
||||
for file in sorted(files):
|
||||
file = os.path.join(root, file)
|
||||
rel_path = bytes(
|
||||
file.replace(folder, "").replace("\\", "/").lstrip("/"),
|
||||
"windows-1252",
|
||||
)
|
||||
size = os.stat(file).st_size
|
||||
data.append((file, size, rel_path))
|
||||
print("Found {} files for {}".format(len(data), filename))
|
||||
offset = len(make_header(data))
|
||||
print("Writing", filename)
|
||||
header = make_header(data, offset)
|
||||
with open(filename, "wb") as outfile:
|
||||
outfile.write(header)
|
||||
for file, size, rel_path in tqdm(data, ascii=True):
|
||||
outfile.write(open(file, "rb").read())
|
||||
print("Done!")
|
45
tools/server.py
Normal file
45
tools/server.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
import socket
|
||||
import binascii
|
||||
import select
|
||||
from construct import *
|
||||
from socketserver import BaseRequestHandler,UDPServer
|
||||
|
||||
INFO = Struct(
|
||||
"version_minor" / Int8ul,
|
||||
"version_major" / Int8ul,
|
||||
"port" / Int16ul,
|
||||
"max_players" / Int16ul,
|
||||
"curr_players" / Int16ul,
|
||||
"name" / FixedSized(0x20, CString("utf-8")),
|
||||
"mode" / FixedSized(0x10, CString("utf-8")),
|
||||
"map" / Bytes(2),
|
||||
"rest" / GreedyBytes,
|
||||
)
|
||||
|
||||
class ScrapHandler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
data, socket = self.request
|
||||
print(self.client_address,data)
|
||||
socket.sendto(data, self.client_address)
|
||||
|
||||
class ScrapSrv(UDPServer):
|
||||
def __init__(self,port=5000):
|
||||
super().__init__(("0.0.0.0",port),ScrapHandler)
|
||||
|
||||
with ScrapSrv() as srv:
|
||||
srv.serve_forever()
|
||||
|
||||
exit()
|
||||
|
||||
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
while True:
|
||||
rl, wl, xl = select.select([sock], [sock], [sock], 0.1)
|
||||
if rl:
|
||||
print(rl)
|
||||
for sock in rl:
|
||||
data, src = sock.recvfrom(1024)
|
||||
print(src, data)
|
||||
if data == b"\x7f\x01\x00\x00\x07":
|
||||
game_info = INFO.build()
|
||||
sock.sendto(game_info, src)
|
4
tools/test.bv
Normal file
4
tools/test.bv
Normal file
|
@ -0,0 +1,4 @@
|
|||
<
|
||||
magic s 4
|
||||
size I
|
||||
data n $size
|
Loading…
Add table
Add a link
Reference in a new issue