Choggbuster/ff_d2v.py

205 lines
5.4 KiB
Python

import sys
import json
import os
import subprocess as SP
import itertools as ITT
from tqdm import tqdm
colorspace = {
"gbr": 0,
"bt709": 1,
"unknown": 2,
"fcc": 4,
"bt470bg": 5,
"smpte170m": 6,
"smpte240m": 7,
"ycgco": 8,
"bt2020nc": 9,
"bt2020c": 10,
"smpte2085": 11,
"chroma-derived-nc": 12,
"chroma-derived-c": 13,
"ictcp": 14,
}
pict_types = {"I": 0b01, "P": 0b10, "B": 0b11}
def make_info(frames):
has_interlaced = any(frame["interlaced_frame"] for frame in frames)
new_gop = "timecode" in frames[0].get("tags", {})
info = 0x000
info |= 1 << 11 # always 1
info |= 0 << 10 # 0=Closed GOP, 1=Open GOP
info |= (not has_interlaced) << 9 # Progressive
info |= new_gop << 8
return info
def make_flags(frames):
flags = []
for frame in frames:
needs_prev = False
progressive = not int(frame["interlaced_frame"])
pt = pict_types[frame["pict_type"]]
reserved = 0b00
tff = int(frame["top_field_first"])
rff = int(frame["repeat_pict"])
flag = 0b0
flag |= (not needs_prev) << 7
flag |= progressive << 6
flag |= pt << 4
flag |= reserved << 2
flag |= tff << 1
flag |= rff
flags.append(f"{flag:02x}")
return flags
def make_line(frames, stream):
info = f"{make_info(frames):03x}"
matrix = colorspace[stream["color_space"]]
file = 0
position = frames[0]["pkt_pos"]
skip = 0
vob = 0
cell = 0
flags = make_flags(frames)
return " ".join(map(str, [info, matrix, file, position, skip, vob, cell, *flags]))
def get_frames(path):
proc = SP.Popen(
[
"ffprobe",
"-probesize",
str(0x7FFFFFFF),
"-analyzeduration",
str(0x7FFFFFFF),
"-v",
"fatal",
"-i",
path,
"-select_streams",
"v:0",
"-show_frames",
"-print_format",
"compact",
],
stdout=SP.PIPE,
stdin=SP.DEVNULL,
bufsize=0,
)
data = None
for line in proc.stdout:
line = str(line, "utf8").strip().split("|")
line = {line[0]: dict(v.split("=") for v in line[1:])}
yield line
ret = proc.wait()
if ret != 0:
exit(ret)
return data
def get_streams(path):
proc = SP.Popen(
[
"ffprobe",
"-probesize",
str(0x7FFFFFFF),
"-analyzeduration",
str(0x7FFFFFFF),
"-v",
"fatal",
"-i",
path,
"-select_streams",
"v:0",
"-show_streams",
"-show_format",
"-print_format",
"json",
],
stdout=SP.PIPE,
stdin=SP.DEVNULL,
bufsize=0,
)
data = json.load(proc.stdout)
ret = proc.wait()
if ret != 0:
exit(ret)
return data["streams"], data["format"]
def make_header(file):
return ["DGIndexProjectFile16", "1", os.path.abspath(file)]
def make_settings(stream):
pict_size = "x".join(map(str, [stream["width"], stream["height"]]))
frame_rate = list(map(int, stream["r_frame_rate"].split("/")))
frame_rate = (frame_rate[0] * 1000) // frame_rate[1]
frame_rate = f"{frame_rate} ({stream['r_frame_rate']})"
header = [
("Stream_Type", 0), # Elementary Stream
("MPEG_Type", 2), # MPEG-2
("iDCT_Algorithm", 5), # 64-bit IEEE-1180 Reference
("YUVRGB_Scale", int(stream["color_range"] != "tv")),
("Luminance_Filter", "0,0"),
("Clipping", "0,0,0,0"),
("Aspect_Ratio", stream["display_aspect_ratio"]),
("Picture_Size", pict_size),
("Field_Operation", 0), # Honor Pulldown Flags
("Frame_Rate", frame_rate),
("Location", "0,0,0,0"),
]
for k, v in header:
yield f"{k}={v}"
def gen_d2v(path):
yield from make_header(path)
yield ""
streams, fmt = get_streams(path)
stream = [s for s in streams if s["codec_type"] == "video"][0]
stream["index"] = str(stream["index"])
yield from make_settings(stream)
yield ""
line_buffer = []
frames = get_frames(path)
prog_bar = tqdm(
frames,
total=int(fmt["size"]),
unit_divisor=1024,
unit_scale=True,
unit="iB",
desc="Writing d2v",
)
for line in prog_bar:
if "frame" not in line:
continue
frame = line["frame"]
prog_bar.n = min(max(prog_bar.n, int(frame["pkt_pos"])), int(fmt["size"]))
prog_bar.update(0)
if frame["stream_index"] != stream["index"]:
continue
if frame["pict_type"] == "I" and line_buffer:
yield make_line(line_buffer, stream)
line_buffer.clear()
line_buffer.append(frame)
prog_bar.close()
yield None
def make_d2v(path):
outfile = os.path.splitext(os.path.basename(path))[0]
outfile = os.path.extsep.join([outfile, "d2v"])
a, b = ITT.tee(gen_d2v(path))
next(b)
with open(outfile, "w") as fh:
for line, next_line in zip(a, b):
fh.write(line)
if next_line is None: # last line, append end marker
fh.write(" ff")
fh.write("\n")