import functools import os from datetime import timedelta import cffi from tqdm import tqdm from dvdread import DVDRead def loadlib(dll_path, *includes, **kwargs): ffi = cffi.FFI() for include in includes: ffi.cdef(open(include).read(), kwargs) return ffi, ffi.dlopen(dll_path) domains = { 0: "None", 1: "FirstPlay", 2: "VTSTitle", 4: "VMGM", 8: "VTSMenu", } events = { 0: "DVDNAV_BLOCK_OK", 1: "DVDNAV_NOP", 2: "DVDNAV_STILL_FRAME", 3: "DVDNAV_SPU_STREAM_CHANGE", 4: "DVDNAV_AUDIO_STREAM_CHANGE", 5: "DVDNAV_VTS_CHANGE", 6: "DVDNAV_CELL_CHANGE", 7: "DVDNAV_NAV_PACKET", 8: "DVDNAV_STOP", 9: "DVDNAV_HIGHLIGHT", 10: "DVDNAV_SPU_CLUT_CHANGE", 12: "DVDNAV_HOP_CHANNEL", 13: "DVDNAV_WAIT", } class DVDError(Exception): pass class DVDNav(object): def __init__(self, path, verbose=None, method="disc"): if verbose is None: os.environ.pop("DVDCSS_VERBOSE", None) else: os.environ["DVDCSS_VERBOSE"] = str(verbose) os.environ["DVDCSS_METHOD"] = method self.dvd = None self.ffi, self.lib = loadlib( "libdvdnav-4.dll", "dvd_types.h", "dvd_reader.h", "ifo_types.h", "nav_types.h", "dvdnav_events.h", "dvdnav.h", pack=True, ) self.path = path self.titles = {} self.open(path) def __del__(self): self.__check_error(self.lib.dvdnav_close(self.dvd)) self.dvd = None def __repr__(self): return " num_angles[0]: raise DVDError("Invalid angle specified!") if angle != curr_angle[0]: self.__check_error(self.lib.dvdnav_angle_change(self.dvd, angle)) if slang is not None: self.__check_error(self.lib.dvdnav_spu_language_select(self.dvd, slang)) event = self.lib.DVDNAV_NOP buf = self.ffi.new("char[]", 4096) ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP) size = self.ffi.new("int32_t*", 0) pos = self.ffi.new("uint32_t*", 0) total_size = self.ffi.new("uint32_t*", 0) progbar = tqdm( unit_divisor=1024, unit_scale=True, unit="iB", desc="Ripping DVD", disable=False, ) ripped = set() cells = set() current_vts = (None,None) current_cell = None current_pg = None while True: self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size)) if ( self.lib.dvdnav_get_position(self.dvd, pos, total_size) == self.lib.DVDNAV_STATUS_OK ): progbar.total = total_size[0] * 2048 progbar.n = max(progbar.n, min(progbar.total, pos[0] * 2048)) progbar.update(0) progbar.set_postfix( vts=current_vts, cell=current_cell, pg=current_pg, angle=angle, title=title, ) # print("Got event:",events.get(ev[0],ev[0]),size[0]) if ev[0] in [ self.lib.DVDNAV_SPU_CLUT_CHANGE, self.lib.DVDNAV_HOP_CHANNEL, self.lib.DVDNAV_NOP, self.lib.DVDNAV_HIGHLIGHT, ]: continue elif ev[0] == self.lib.DVDNAV_BLOCK_OK: yield self.ffi.buffer(buf, size[0])[:] elif ev[0] == self.lib.DVDNAV_STOP: progbar.write(f"[{title}|{angle}] Stop") break elif ev[0] == self.lib.DVDNAV_NAV_PACKET: pass elif ev[0] == self.lib.DVDNAV_STILL_FRAME: self.__check_error(self.lib.dvdnav_still_skip(self.dvd)) elif ev[0] == self.lib.DVDNAV_WAIT: self.__check_error(self.lib.dvdnav_wait_skip(self.dvd)) elif ev[0] == self.lib.DVDNAV_SPU_STREAM_CHANGE: pass elif ev[0] == self.lib.DVDNAV_AUDIO_STREAM_CHANGE: audio = self.ffi.cast("dvdnav_audio_stream_change_event_t*", buf) elif ev[0] == self.lib.DVDNAV_CELL_CHANGE: cell = self.ffi.cast("dvdnav_cell_change_event_t*", buf) current_cell = cell.cellN current_pg = cell.pgN progbar.write( f"[{title}|{angle}] Cell: {cell.cellN} ({hex(cell.cell_start)}-{hex(cell.cell_start+cell.cell_length)}), PG: {cell.pgN} ({hex(cell.pg_start)}-{hex(cell.pg_start+cell.pg_length)})" ) fp=(current_vts[0],current_vts[1],cell.cellN,cell.pgN,cell.cell_length,cell.pg_length,cell.pgc_length,cell.cell_start,cell.pg_start) if fp in cells: progbar.write(f"[{title}|{angle}] Cells Looped!") break cells.add(fp) elif ev[0] == self.lib.DVDNAV_VTS_CHANGE: vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf) old_domain = domains[vts.old_domain] new_domain = domains[vts.new_domain] new_vts = (vts.new_vtsN, vts.new_domain) old_vts = (vts.old_vtsN, vts.old_domain) ripped.add((vts.old_vtsN, vts.old_domain)) cells.clear() progbar.write(f"[{title}|{angle}] VTS: {vts.old_vtsN} ({vts.old_domain} {old_domain}) -> {vts.new_vtsN} ({vts.new_domain} {new_domain})") if (new_vts in ripped) or new_vts==old_vts: # looped progbar.write(f"[{title}|{angle}] VTS Looped!") break current_vts = (vts.new_vtsN, vts.new_domain) if vts.new_domain == 8: # back to menu progbar.write(f"[{title}|{angle}] VTS Back to menu!") break # yield vts.new_vtsN else: progbar.write( f"[{title}|{angle}] Unhandled: {events.get(ev[0],ev[0])} {size[0]}" ) self.__check_error(self.lib.dvdnav_stop(self.dvd)) progbar.close() def __check_error(self, ret): if ret == self.lib.DVDNAV_STATUS_ERR: if self.dvd: err = self.ffi.string(self.lib.dvdnav_err_to_string(self.dvd)) raise DVDError(err) raise DVDError("Unknown error") def __get_vts(self,title): buf = self.ffi.new("char[]", 4096) ev = self.ffi.new("int32_t*", self.lib.DVDNAV_NOP) size = self.ffi.new("int32_t*", 0) pos = self.ffi.new("uint32_t*", 0) total_size = self.ffi.new("uint32_t*", 0) self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd, 1)) self.__check_error(self.lib.dvdnav_title_play(self.dvd, title)) seq=[] while True: self.__check_error(self.lib.dvdnav_get_next_block(self.dvd, buf, ev, size)) if ev[0] == self.lib.DVDNAV_BLOCK_OK: self.__check_error(self.lib.dvdnav_get_position(self.dvd, pos, total_size)) # print(title,pos[0],total_size[0]) if self.lib.dvdnav_next_pg_search(self.dvd)==0: break elif ev[0] == self.lib.DVDNAV_STOP: break elif ev[0] == self.lib.DVDNAV_STILL_FRAME: self.__check_error(self.lib.dvdnav_still_skip(self.dvd)) elif ev[0] == self.lib.DVDNAV_WAIT: self.__check_error(self.lib.dvdnav_wait_skip(self.dvd)) elif ev[0] == self.lib.DVDNAV_VTS_CHANGE: vts = self.ffi.cast("dvdnav_vts_change_event_t*", buf) old_domain = domains[vts.old_domain] new_domain = domains[vts.new_domain] seq.append( (vts.new_vtsN, new_domain) ) if vts.new_domain==8: break continue # print(title,ev[0],size[0]) self.__check_error(self.lib.dvdnav_stop(self.dvd)) # print(title,seq) return seq # self.__check_error(self.lib.dvdnav_next_pg_search(self.dvd)) def __get_titles(self): titles = self.ffi.new("int32_t*", 0) p_times = self.ffi.new("uint64_t[]", 512) times = self.ffi.new("uint64_t**", p_times) duration = self.ffi.new("uint64_t*", 0) titles = self.ffi.new("int32_t*", 0) self.lib.dvdnav_get_number_of_titles(self.dvd, titles) num_titles = titles[0] for title in range(0, num_titles + 1): if self.lib.dvdnav_get_number_of_parts(self.dvd, title, titles) == 0: continue num_parts = titles[0] self.lib.dvdnav_get_number_of_angles(self.dvd, title, titles) num_angles = titles[0] num_chapters = self.lib.dvdnav_describe_title_chapters( self.dvd, title, times, duration ) if duration[0] == 0: continue chapters = [] if num_chapters==0 and times[0]==self.ffi.NULL: chapters=None for t in range(num_chapters): chapters.append(timedelta(seconds=times[0][t] / 90000)) self.titles[title] = { "parts": num_parts, "angles": num_angles, "duration": timedelta(seconds=duration[0] / 90000), "chapters": chapters, } def __get_info(self): s = self.ffi.new("char**", self.ffi.NULL) self.lib.dvdnav_get_title_string(self.dvd, s) self.title = str(self.ffi.string(s[0]), "utf8").strip() or None self.lib.dvdnav_get_serial_string(self.dvd, s) self.serial = str(self.ffi.string(s[0]), "utf8").strip() or None self.__get_titles() def open(self, path): audio_attrs = self.ffi.new("audio_attr_t*") spu_attr = self.ffi.new("subp_attr_t*") dvdnav = self.ffi.new("dvdnav_t**", self.ffi.cast("dvdnav_t*", 0)) self.__check_error(self.lib.dvdnav_open(dvdnav, bytes(path, "utf8"))) self.dvd = dvdnav[0] self.__check_error(self.lib.dvdnav_set_readahead_flag(self.dvd, 1)) self.__get_info() for title in self.titles: self.__check_error(self.lib.dvdnav_title_play(self.dvd, title)) self.titles[title]["audio"] = {} self.titles[title]["subtitles"] = {} # self.titles[title]["vts"] = self.__get_vts(title) for n in range(255): stream_id = self.lib.dvdnav_get_audio_logical_stream(self.dvd, n) if stream_id == -1: continue self.__check_error( self.lib.dvdnav_get_audio_attr(self.dvd, stream_id, audio_attrs) ) alang = None if audio_attrs.lang_type: alang = str(audio_attrs.lang_code.to_bytes(2, "big"), "utf8") channels = audio_attrs.channels + 1 codec = {0: "ac3", 2: "mpeg1", 3: "mpeg-2ext", 4: "lpcm", 6: "dts"}[ audio_attrs.audio_format ] audio_type = { 0: None, 1: "normal", 2: "descriptive", 3: "director's commentary", 4: "alternate director's commentary", }[audio_attrs.code_extension] self.titles[title]["audio"][stream_id] = { "lang": alang, "channels": channels, "codec": codec, "type": audio_type, } for n in range(255): stream_id = self.lib.dvdnav_get_spu_logical_stream(self.dvd, n) if stream_id == -1: continue self.__check_error( self.lib.dvdnav_get_spu_attr(self.dvd, stream_id, spu_attr) ) slang = None if spu_attr.type == 1: slang = str(spu_attr.lang_code.to_bytes(2, "big"), "utf8") self.titles[title]["subtitles"][stream_id] = { "lang": slang, } self.__check_error(self.lib.dvdnav_stop(self.dvd)) # exit("DEBUG!")