Choggbuster is a set of python scripts aimed at automated preprocessing of video DVDs for archival and filtering
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

320 lines
13 KiB

import functools
import os
from datetime import timedelta
import cffi
from tqdm import tqdm
from dvdread import DVDRead
def loadlib(dll_path, *includes, **kwargs):
ffi = cffi.FFI()
for include in includes:
ffi.cdef(open(include).read(), kwargs)
return ffi, ffi.dlopen(dll_path)
domains = {
0: "None",
1: "FirstPlay",
2: "VTSTitle",
4: "VMGM",
8: "VTSMenu",
}
events = {
0: "DVDNAV_BLOCK_OK",
1: "DVDNAV_NOP",
2: "DVDNAV_STILL_FRAME",
3: "DVDNAV_SPU_STREAM_CHANGE",
4: "DVDNAV_AUDIO_STREAM_CHANGE",
5: "DVDNAV_VTS_CHANGE",
6: "DVDNAV_CELL_CHANGE",
7: "DVDNAV_NAV_PACKET",
8: "DVDNAV_STOP",
9: "DVDNAV_HIGHLIGHT",
10: "DVDNAV_SPU_CLUT_CHANGE",
12: "DVDNAV_HOP_CHANNEL",
13: "DVDNAV_WAIT",
}
class DVDError(Exception):
pass
class DVDNav(object):
def __init__(self, path, verbose=None, method="disc"):
if verbose is None:
os.environ.pop("DVDCSS_VERBOSE", None)
else:
os.environ["DVDCSS_VERBOSE"] = str(verbose)
os.environ["DVDCSS_METHOD"] = method
self.dvd = None
self.ffi, self.lib = loadlib(
"libdvdnav-4.dll",
"dvd_types.h",
"dvd_reader.h",
"ifo_types.h",
"nav_types.h",
"dvdnav_events.h",
"dvdnav.h",
pack=True,
)
self.path = path
self.titles = {}
self.open(path)
def __del__(self):
self.__check_error(self.lib.dvdnav_close(self.dvd))
self.dvd = None
def __repr__(self):
return "<DVD Path={0.path} Title={0.title} Serial={0.serial}".format(self)
def get_blocks(self, title, angle=1, slang=None):
self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd, 1))
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
curr_angle = self.ffi.new("int32_t*", 0)
num_angles = self.ffi.new("int32_t*", 0)
self.__check_error(
self.lib.dvdnav_get_angle_info(self.dvd, curr_angle, num_angles)
)
if angle != 0:
if angle < 1 or angle > num_angles[0]:
raise DVDError("Invalid angle specified!")
if angle != curr_angle[0]:
self.__check_error(self.lib.dvdnav_angle_change(self.dvd, angle))
if slang is not None:
self.__check_error(self.lib.dvdnav_spu_language_select(self.dvd, slang))
event = self.lib.DVDNAV_NOP
buf = self.ffi.new("char[]", 4096)
ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP)
size = self.ffi.new("int32_t*", 0)
pos = self.ffi.new("uint32_t*", 0)
total_size = self.ffi.new("uint32_t*", 0)
progbar = tqdm(
unit_divisor=1024,
unit_scale=True,
unit="iB",
desc="Ripping DVD",
disable=False,
)
ripped = set()
cells = set()
current_vts = (None,None)
current_cell = None
current_pg = None
while True:
self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size))
if (
self.lib.dvdnav_get_position(self.dvd, pos, total_size)
== self.lib.DVDNAV_STATUS_OK
):
progbar.total = total_size[0] * 2048
progbar.n = max(progbar.n, min(progbar.total, pos[0] * 2048))
progbar.update(0)
progbar.set_postfix(
vts=current_vts,
cell=current_cell,
pg=current_pg,
angle=angle,
title=title,
)
# print("Got event:",events.get(ev[0],ev[0]),size[0])
if ev[0] in [
self.lib.DVDNAV_SPU_CLUT_CHANGE,
self.lib.DVDNAV_HOP_CHANNEL,
self.lib.DVDNAV_NOP,
self.lib.DVDNAV_HIGHLIGHT,
]:
continue
elif ev[0] == self.lib.DVDNAV_BLOCK_OK:
yield self.ffi.buffer(buf, size[0])[:]
elif ev[0] == self.lib.DVDNAV_STOP:
progbar.write(f"[{title}|{angle}] Stop")
break
elif ev[0] == self.lib.DVDNAV_NAV_PACKET:
pass
elif ev[0] == self.lib.DVDNAV_STILL_FRAME:
self.__check_error(self.lib.dvdnav_still_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_WAIT:
self.__check_error(self.lib.dvdnav_wait_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_SPU_STREAM_CHANGE:
pass
elif ev[0] == self.lib.DVDNAV_AUDIO_STREAM_CHANGE:
audio = self.ffi.cast("dvdnav_audio_stream_change_event_t*", buf)
elif ev[0] == self.lib.DVDNAV_CELL_CHANGE:
cell = self.ffi.cast("dvdnav_cell_change_event_t*", buf)
current_cell = cell.cellN
current_pg = cell.pgN
progbar.write(
f"[{title}|{angle}] Cell: {cell.cellN} ({hex(cell.cell_start)}-{hex(cell.cell_start+cell.cell_length)}), PG: {cell.pgN} ({hex(cell.pg_start)}-{hex(cell.pg_start+cell.pg_length)})"
)
fp=(current_vts[0],current_vts[1],cell.cellN,cell.pgN,cell.cell_length,cell.pg_length,cell.pgc_length,cell.cell_start,cell.pg_start)
if fp in cells:
progbar.write(f"[{title}|{angle}] Cells Looped!")
break
cells.add(fp)
elif ev[0] == self.lib.DVDNAV_VTS_CHANGE:
vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf)
old_domain = domains[vts.old_domain]
new_domain = domains[vts.new_domain]
new_vts = (vts.new_vtsN, vts.new_domain)
old_vts = (vts.old_vtsN, vts.old_domain)
ripped.add((vts.old_vtsN, vts.old_domain))
cells.clear()
progbar.write(f"[{title}|{angle}] VTS: {vts.old_vtsN} ({vts.old_domain} {old_domain}) -> {vts.new_vtsN} ({vts.new_domain} {new_domain})")
if (new_vts in ripped) or new_vts==old_vts: # looped
progbar.write(f"[{title}|{angle}] VTS Looped!")
break
current_vts = (vts.new_vtsN, vts.new_domain)
if vts.new_domain == 8: # back to menu
progbar.write(f"[{title}|{angle}] VTS Back to menu!")
break
# yield vts.new_vtsN
else:
progbar.write(
f"[{title}|{angle}] Unhandled: {events.get(ev[0],ev[0])} {size[0]}"
)
self.__check_error(self.lib.dvdnav_stop(self.dvd))
progbar.close()
def __check_error(self, ret):
if ret == self.lib.DVDNAV_STATUS_ERR:
if self.dvd:
err = self.ffi.string(self.lib.dvdnav_err_to_string(self.dvd))
raise DVDError(err)
raise DVDError("Unknown error")
def __get_vts(self,title):
buf = self.ffi.new("char[]", 4096)
ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP)
size = self.ffi.new("int32_t*", 0)
pos = self.ffi.new("uint32_t*", 0)
total_size = self.ffi.new("uint32_t*", 0)
self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd, 1))
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
seq=[]
while True:
self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size))
if ev[0] == self.lib.DVDNAV_BLOCK_OK:
self.__check_error(self.lib.dvdnav_get_position(self.dvd, pos, total_size))
# print(title,pos[0],total_size[0])
if self.lib.dvdnav_next_pg_search(self.dvd)==0:
break
elif ev[0] == self.lib.DVDNAV_STOP:
break
elif ev[0] == self.lib.DVDNAV_STILL_FRAME:
self.__check_error(self.lib.dvdnav_still_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_WAIT:
self.__check_error(self.lib.dvdnav_wait_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_VTS_CHANGE:
vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf)
old_domain = domains[vts.old_domain]
new_domain = domains[vts.new_domain]
seq.append(
(vts.new_vtsN, new_domain)
)
if vts.new_domain==8:
break
continue
# print(title,ev[0],size[0])
self.__check_error(self.lib.dvdnav_stop(self.dvd))
# print(title,seq)
return seq
# self.__check_error(self.lib.dvdnav_next_pg_search(self.dvd))
def __get_titles(self):
titles = self.ffi.new("int32_t*", 0)
p_times = self.ffi.new("uint64_t[]", 512)
times = self.ffi.new("uint64_t**", p_times)
duration = self.ffi.new("uint64_t*", 0)
titles = self.ffi.new("int32_t*", 0)
self.lib.dvdnav_get_number_of_titles(self.dvd, titles)
num_titles = titles[0]
for title in range(0, num_titles + 1):
if self.lib.dvdnav_get_number_of_parts(self.dvd, title, titles) == 0:
continue
num_parts = titles[0]
self.lib.dvdnav_get_number_of_angles(self.dvd, title, titles)
num_angles = titles[0]
num_chapters = self.lib.dvdnav_describe_title_chapters(
self.dvd, title, times, duration
)
if duration[0] == 0:
continue
chapters = []
if num_chapters==0 and times[0]==self.ffi.NULL:
chapters=None
for t in range(num_chapters):
chapters.append(timedelta(seconds=times[0][t] / 90000))
self.titles[title] = {
"parts": num_parts,
"angles": num_angles,
"duration": timedelta(seconds=duration[0] / 90000),
"chapters": chapters,
}
def __get_info(self):
s = self.ffi.new("char**", self.ffi.NULL)
self.lib.dvdnav_get_title_string(self.dvd, s)
self.title = str(self.ffi.string(s[0]), "utf8").strip() or None
self.lib.dvdnav_get_serial_string(self.dvd, s)
self.serial = str(self.ffi.string(s[0]), "utf8").strip() or None
self.__get_titles()
def open(self, path):
audio_attrs = self.ffi.new("audio_attr_t*")
spu_attr = self.ffi.new("subp_attr_t*")
dvdnav = self.ffi.new("dvdnav_t**", self.ffi.cast("dvdnav_t*", 0))
self.__check_error(self.lib.dvdnav_open(dvdnav, bytes(path, "utf8")))
self.dvd = dvdnav[0]
self.__check_error(self.lib.dvdnav_set_readahead_flag(self.dvd, 1))
self.__get_info()
for title in self.titles:
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
self.titles[title]["audio"] = {}
self.titles[title]["subtitles"] = {}
# self.titles[title]["vts"] = self.__get_vts(title)
for n in range(255):
stream_id = self.lib.dvdnav_get_audio_logical_stream(self.dvd, n)
if stream_id == -1:
continue
self.__check_error(
self.lib.dvdnav_get_audio_attr(self.dvd, stream_id, audio_attrs)
)
alang = None
if audio_attrs.lang_type:
alang = str(audio_attrs.lang_code.to_bytes(2, "big"), "utf8")
channels = audio_attrs.channels + 1
codec = {0: "ac3", 2: "mpeg1", 3: "mpeg-2ext", 4: "lpcm", 6: "dts"}[
audio_attrs.audio_format
]
audio_type = {
0: None,
1: "normal",
2: "descriptive",
3: "director's commentary",
4: "alternate director's commentary",
}[audio_attrs.code_extension]
self.titles[title]["audio"][stream_id] = {
"lang": alang,
"channels": channels,
"codec": codec,
"type": audio_type,
}
for n in range(255):
stream_id = self.lib.dvdnav_get_spu_logical_stream(self.dvd, n)
if stream_id == -1:
continue
self.__check_error(
self.lib.dvdnav_get_spu_attr(self.dvd, stream_id, spu_attr)
)
slang = None
if spu_attr.type == 1:
slang = str(spu_attr.lang_code.to_bytes(2, "big"), "utf8")
self.titles[title]["subtitles"][stream_id] = {
"lang": slang,
}
self.__check_error(self.lib.dvdnav_stop(self.dvd))
# exit("DEBUG!")