Choggbuster/dvdnav.py

320 lines
13 KiB
Python

import functools
import os
from datetime import timedelta
import cffi
from tqdm import tqdm
from dvdread import DVDRead
def loadlib(dll_path, *includes, **kwargs):
ffi = cffi.FFI()
for include in includes:
ffi.cdef(open(include).read(), kwargs)
return ffi, ffi.dlopen(dll_path)
domains = {
0: "None",
1: "FirstPlay",
2: "VTSTitle",
4: "VMGM",
8: "VTSMenu",
}
events = {
0: "DVDNAV_BLOCK_OK",
1: "DVDNAV_NOP",
2: "DVDNAV_STILL_FRAME",
3: "DVDNAV_SPU_STREAM_CHANGE",
4: "DVDNAV_AUDIO_STREAM_CHANGE",
5: "DVDNAV_VTS_CHANGE",
6: "DVDNAV_CELL_CHANGE",
7: "DVDNAV_NAV_PACKET",
8: "DVDNAV_STOP",
9: "DVDNAV_HIGHLIGHT",
10: "DVDNAV_SPU_CLUT_CHANGE",
12: "DVDNAV_HOP_CHANNEL",
13: "DVDNAV_WAIT",
}
class DVDError(Exception):
pass
class DVDNav(object):
def __init__(self, path, verbose=None, method="disc"):
if verbose is None:
os.environ.pop("DVDCSS_VERBOSE", None)
else:
os.environ["DVDCSS_VERBOSE"] = str(verbose)
os.environ["DVDCSS_METHOD"] = method
self.dvd = None
self.ffi, self.lib = loadlib(
"libdvdnav-4.dll",
"dvd_types.h",
"dvd_reader.h",
"ifo_types.h",
"nav_types.h",
"dvdnav_events.h",
"dvdnav.h",
pack=True,
)
self.path = path
self.titles = {}
self.open(path)
def __del__(self):
self.__check_error(self.lib.dvdnav_close(self.dvd))
self.dvd = None
def __repr__(self):
return "<DVD Path={0.path} Title={0.title} Serial={0.serial}".format(self)
def get_blocks(self, title, angle=1, slang=None):
self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd, 1))
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
curr_angle = self.ffi.new("int32_t*", 0)
num_angles = self.ffi.new("int32_t*", 0)
self.__check_error(
self.lib.dvdnav_get_angle_info(self.dvd, curr_angle, num_angles)
)
if angle != 0:
if angle < 1 or angle > num_angles[0]:
raise DVDError("Invalid angle specified!")
if angle != curr_angle[0]:
self.__check_error(self.lib.dvdnav_angle_change(self.dvd, angle))
if slang is not None:
self.__check_error(self.lib.dvdnav_spu_language_select(self.dvd, slang))
event = self.lib.DVDNAV_NOP
buf = self.ffi.new("char[]", 4096)
ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP)
size = self.ffi.new("int32_t*", 0)
pos = self.ffi.new("uint32_t*", 0)
total_size = self.ffi.new("uint32_t*", 0)
progbar = tqdm(
unit_divisor=1024,
unit_scale=True,
unit="iB",
desc="Ripping DVD",
disable=False,
)
ripped = set()
cells = set()
current_vts = (None,None)
current_cell = None
current_pg = None
while True:
self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size))
if (
self.lib.dvdnav_get_position(self.dvd, pos, total_size)
== self.lib.DVDNAV_STATUS_OK
):
progbar.total = total_size[0] * 2048
progbar.n = max(progbar.n, min(progbar.total, pos[0] * 2048))
progbar.update(0)
progbar.set_postfix(
vts=current_vts,
cell=current_cell,
pg=current_pg,
angle=angle,
title=title,
)
# print("Got event:",events.get(ev[0],ev[0]),size[0])
if ev[0] in [
self.lib.DVDNAV_SPU_CLUT_CHANGE,
self.lib.DVDNAV_HOP_CHANNEL,
self.lib.DVDNAV_NOP,
self.lib.DVDNAV_HIGHLIGHT,
]:
continue
elif ev[0] == self.lib.DVDNAV_BLOCK_OK:
yield self.ffi.buffer(buf, size[0])[:]
elif ev[0] == self.lib.DVDNAV_STOP:
progbar.write(f"[{title}|{angle}] Stop")
break
elif ev[0] == self.lib.DVDNAV_NAV_PACKET:
pass
elif ev[0] == self.lib.DVDNAV_STILL_FRAME:
self.__check_error(self.lib.dvdnav_still_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_WAIT:
self.__check_error(self.lib.dvdnav_wait_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_SPU_STREAM_CHANGE:
pass
elif ev[0] == self.lib.DVDNAV_AUDIO_STREAM_CHANGE:
audio = self.ffi.cast("dvdnav_audio_stream_change_event_t*", buf)
elif ev[0] == self.lib.DVDNAV_CELL_CHANGE:
cell = self.ffi.cast("dvdnav_cell_change_event_t*", buf)
current_cell = cell.cellN
current_pg = cell.pgN
progbar.write(
f"[{title}|{angle}] Cell: {cell.cellN} ({hex(cell.cell_start)}-{hex(cell.cell_start+cell.cell_length)}), PG: {cell.pgN} ({hex(cell.pg_start)}-{hex(cell.pg_start+cell.pg_length)})"
)
fp=(current_vts[0],current_vts[1],cell.cellN,cell.pgN,cell.cell_length,cell.pg_length,cell.pgc_length,cell.cell_start,cell.pg_start)
if fp in cells:
progbar.write(f"[{title}|{angle}] Cells Looped!")
break
cells.add(fp)
elif ev[0] == self.lib.DVDNAV_VTS_CHANGE:
vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf)
old_domain = domains[vts.old_domain]
new_domain = domains[vts.new_domain]
new_vts = (vts.new_vtsN, vts.new_domain)
old_vts = (vts.old_vtsN, vts.old_domain)
ripped.add((vts.old_vtsN, vts.old_domain))
cells.clear()
progbar.write(f"[{title}|{angle}] VTS: {vts.old_vtsN} ({vts.old_domain} {old_domain}) -> {vts.new_vtsN} ({vts.new_domain} {new_domain})")
if (new_vts in ripped) or new_vts==old_vts: # looped
progbar.write(f"[{title}|{angle}] VTS Looped!")
break
current_vts = (vts.new_vtsN, vts.new_domain)
if vts.new_domain == 8: # back to menu
progbar.write(f"[{title}|{angle}] VTS Back to menu!")
break
# yield vts.new_vtsN
else:
progbar.write(
f"[{title}|{angle}] Unhandled: {events.get(ev[0],ev[0])} {size[0]}"
)
self.__check_error(self.lib.dvdnav_stop(self.dvd))
progbar.close()
def __check_error(self, ret):
if ret == self.lib.DVDNAV_STATUS_ERR:
if self.dvd:
err = self.ffi.string(self.lib.dvdnav_err_to_string(self.dvd))
raise DVDError(err)
raise DVDError("Unknown error")
def __get_vts(self,title):
buf = self.ffi.new("char[]", 4096)
ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP)
size = self.ffi.new("int32_t*", 0)
pos = self.ffi.new("uint32_t*", 0)
total_size = self.ffi.new("uint32_t*", 0)
self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd, 1))
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
seq=[]
while True:
self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size))
if ev[0] == self.lib.DVDNAV_BLOCK_OK:
self.__check_error(self.lib.dvdnav_get_position(self.dvd, pos, total_size))
# print(title,pos[0],total_size[0])
if self.lib.dvdnav_next_pg_search(self.dvd)==0:
break
elif ev[0] == self.lib.DVDNAV_STOP:
break
elif ev[0] == self.lib.DVDNAV_STILL_FRAME:
self.__check_error(self.lib.dvdnav_still_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_WAIT:
self.__check_error(self.lib.dvdnav_wait_skip(self.dvd))
elif ev[0] == self.lib.DVDNAV_VTS_CHANGE:
vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf)
old_domain = domains[vts.old_domain]
new_domain = domains[vts.new_domain]
seq.append(
(vts.new_vtsN, new_domain)
)
if vts.new_domain==8:
break
continue
# print(title,ev[0],size[0])
self.__check_error(self.lib.dvdnav_stop(self.dvd))
# print(title,seq)
return seq
# self.__check_error(self.lib.dvdnav_next_pg_search(self.dvd))
def __get_titles(self):
titles = self.ffi.new("int32_t*", 0)
p_times = self.ffi.new("uint64_t[]", 512)
times = self.ffi.new("uint64_t**", p_times)
duration = self.ffi.new("uint64_t*", 0)
titles = self.ffi.new("int32_t*", 0)
self.lib.dvdnav_get_number_of_titles(self.dvd, titles)
num_titles = titles[0]
for title in range(0, num_titles + 1):
if self.lib.dvdnav_get_number_of_parts(self.dvd, title, titles) == 0:
continue
num_parts = titles[0]
self.lib.dvdnav_get_number_of_angles(self.dvd, title, titles)
num_angles = titles[0]
num_chapters = self.lib.dvdnav_describe_title_chapters(
self.dvd, title, times, duration
)
if duration[0] == 0:
continue
chapters = []
if num_chapters==0 and times[0]==self.ffi.NULL:
chapters=None
for t in range(num_chapters):
chapters.append(timedelta(seconds=times[0][t] / 90000))
self.titles[title] = {
"parts": num_parts,
"angles": num_angles,
"duration": timedelta(seconds=duration[0] / 90000),
"chapters": chapters,
}
def __get_info(self):
s = self.ffi.new("char**", self.ffi.NULL)
self.lib.dvdnav_get_title_string(self.dvd, s)
self.title = str(self.ffi.string(s[0]), "utf8").strip() or None
self.lib.dvdnav_get_serial_string(self.dvd, s)
self.serial = str(self.ffi.string(s[0]), "utf8").strip() or None
self.__get_titles()
def open(self, path):
audio_attrs = self.ffi.new("audio_attr_t*")
spu_attr = self.ffi.new("subp_attr_t*")
dvdnav = self.ffi.new("dvdnav_t**", self.ffi.cast("dvdnav_t*", 0))
self.__check_error(self.lib.dvdnav_open(dvdnav, bytes(path, "utf8")))
self.dvd = dvdnav[0]
self.__check_error(self.lib.dvdnav_set_readahead_flag(self.dvd, 1))
self.__get_info()
for title in self.titles:
self.__check_error(self.lib.dvdnav_title_play(self.dvd, title))
self.titles[title]["audio"] = {}
self.titles[title]["subtitles"] = {}
# self.titles[title]["vts"] = self.__get_vts(title)
for n in range(255):
stream_id = self.lib.dvdnav_get_audio_logical_stream(self.dvd, n)
if stream_id == -1:
continue
self.__check_error(
self.lib.dvdnav_get_audio_attr(self.dvd, stream_id, audio_attrs)
)
alang = None
if audio_attrs.lang_type:
alang = str(audio_attrs.lang_code.to_bytes(2, "big"), "utf8")
channels = audio_attrs.channels + 1
codec = {0: "ac3", 2: "mpeg1", 3: "mpeg-2ext", 4: "lpcm", 6: "dts"}[
audio_attrs.audio_format
]
audio_type = {
0: None,
1: "normal",
2: "descriptive",
3: "director's commentary",
4: "alternate director's commentary",
}[audio_attrs.code_extension]
self.titles[title]["audio"][stream_id] = {
"lang": alang,
"channels": channels,
"codec": codec,
"type": audio_type,
}
for n in range(255):
stream_id = self.lib.dvdnav_get_spu_logical_stream(self.dvd, n)
if stream_id == -1:
continue
self.__check_error(
self.lib.dvdnav_get_spu_attr(self.dvd, stream_id, spu_attr)
)
slang = None
if spu_attr.type == 1:
slang = str(spu_attr.lang_code.to_bytes(2, "big"), "utf8")
self.titles[title]["subtitles"][stream_id] = {
"lang": slang,
}
self.__check_error(self.lib.dvdnav_stop(self.dvd))
# exit("DEBUG!")