Choggbuster/dvdnav.py

212 lines
9.3 KiB
Python
Raw Normal View History

import cffi
import os
import functools
from datetime import timedelta
from tqdm import tqdm
def loadlib(dll_path,*includes,**kwargs):
ffi = cffi.FFI()
for include in includes:
ffi.cdef(open(include).read(),kwargs)
return ffi,ffi.dlopen(dll_path)
class DVDError(Exception):
pass
class DVDNav(object):
def __init__(self,path,verbose=2,method="disc"):
os.environ["DVDCSS_VERBOSE"]=str(verbose)
os.environ["DVDCSS_METHOD"]=method
self.dvd=None
self.ffi,self.lib = loadlib("libdvdnav-4.dll",
"dvd_types.h",
"dvd_reader.h",
"ifo_types.h",
"nav_types.h",
"dvdnav_events.h",
"dvdnav.h")
self.path=path
self.titles={}
self.open(path)
def __del__(self):
self.__check_error(self.lib.dvdnav_close(self.dvd))
self.dvd=None
def __repr__(self):
return "<DVD Path={0.path} Title={0.title} Serial={0.serial}".format(self)
def get_blocks(self,title,angle=1,slang=None):
self.__check_error(self.lib.dvdnav_set_PGC_positioning_flag(self.dvd,1))
self.__check_error(self.lib.dvdnav_title_play(self.dvd,title))
self.__check_error(self.lib.dvdnav_angle_change(self.dvd,angle))
if slang is not None:
self.__check_error(self.lib.dvdnav_spu_language_select(self.dvd,slang))
event=self.lib.DVDNAV_NOP
buf=self.ffi.new("char[]",4096)
ev=self.ffi.new("int32_t*",self.lib.DVDNAV_NOP)
size=self.ffi.new("int32_t*",0)
pos=self.ffi.new("uint32_t*",0)
total_size=self.ffi.new("uint32_t*",0)
domains = {
1: "FirstPlay",
2: "VTSTitle",
4: "VMGM",
8: "VTSMenu"
}
events={
0: "DVDNAV_BLOCK_OK",
1: "DVDNAV_NOP",
2: "DVDNAV_STILL_FRAME",
3: "DVDNAV_SPU_STREAM_CHANGE",
4: "DVDNAV_AUDIO_STREAM_CHANGE",
5: "DVDNAV_VTS_CHANGE",
6: "DVDNAV_CELL_CHANGE",
7: "DVDNAV_NAV_PACKET",
8: "DVDNAV_STOP",
9: "DVDNAV_HIGHLIGHT",
10: "DVDNAV_SPU_CLUT_CHANGE",
12: "DVDNAV_HOP_CHANNEL",
13: "DVDNAV_WAIT"
}
progbar=tqdm(unit_divisor=1024,unit_scale=True,unit="iB",desc="Ripping DVD")
while True:
self.__check_error(self.lib.dvdnav_get_next_block(self.dvd,buf,ev,size))
if self.lib.dvdnav_get_position(self.dvd,pos,total_size)==self.lib.DVDNAV_STATUS_OK:
progbar.total=total_size[0]*2048
progbar.n=max(progbar.n,min(progbar.total,pos[0]*2048))
progbar.update(0)
# print("Got event:",events.get(ev[0],ev[0]),size[0])
if ev[0] in [self.lib.DVDNAV_SPU_CLUT_CHANGE,self.lib.DVDNAV_HOP_CHANNEL,self.lib.DVDNAV_NOP]:
continue
elif ev[0]==self.lib.DVDNAV_BLOCK_OK:
# print("Read",size[0])
yield self.ffi.buffer(buf,size[0])[:]
elif ev[0]==self.lib.DVDNAV_STOP:
break
elif ev[0]==self.lib.DVDNAV_NAV_PACKET:
nav=self.lib.dvdnav_get_current_nav_pci(self.dvd)
# print("PTS:",timedelta(seconds=nav.pci_gi.vobu_s_ptm/90000))
elif ev[0]==self.lib.DVDNAV_STILL_FRAME:
# print("Still")
self.__check_error(self.lib.dvdnav_still_skip(self.dvd))
elif ev[0]==self.lib.DVDNAV_WAIT:
# print("Wait",size[0])
self.__check_error(self.lib.dvdnav_wait_skip(self.dvd))
elif ev[0]==self.lib.DVDNAV_SPU_STREAM_CHANGE:
spu=self.ffi.cast("dvdnav_spu_stream_change_event_t*",buf)
progbar.write(f"[{title}|{angle}] SPU: Wide: {spu.physical_wide} Letterbox: {spu.physical_letterbox} Pan&Scan: {spu.physical_pan_scan} Logical: {spu.logical}")
elif ev[0]==self.lib.DVDNAV_AUDIO_STREAM_CHANGE:
audio=self.ffi.cast("dvdnav_audio_stream_change_event_t*",buf)
progbar.write(f"[{title}|{angle}] Audio: Physical: {audio.physical} Logical: {audio.logical}")
elif ev[0]==self.lib.DVDNAV_CELL_CHANGE:
cell=self.ffi.cast("dvdnav_cell_change_event_t*",buf)
progbar.write(f"[{title}|{angle}] Cell: {cell.cellN} ({cell.cell_start}-{cell.cell_start+cell.cell_length}), PG: {cell.pgN} ({cell.pg_start}-{cell.pg_start+cell.pg_length})")
elif ev[0]==self.lib.DVDNAV_VTS_CHANGE:
vts=self.ffi.cast("dvdnav_vts_change_event_t*",buf)
old_domain=sorted(domains[k] for k in domains if vts.old_domain&k)
new_domain=sorted(domains[k] for k in domains if vts.new_domain&k)
progbar.write(f"[{title}|{angle}] VTS: {vts.old_vtsN} ({vts.old_domain} {old_domain}) -> {vts.new_vtsN} ({vts.new_domain} {new_domain})")
if vts.new_domain==8: # back to menu
break
else:
progbar.write(f"[{title}|{angle}] Unhandled: {events.get(ev[0],ev[0])} {size[0]}")
def __check_error(self,ret):
if ret==self.lib.DVDNAV_STATUS_ERR:
if self.dvd:
err=self.ffi.string(self.lib.dvdnav_err_to_string(self.dvd))
raise DVDError(err)
raise DVDError("Unknown error")
def __get_titles(self):
titles=self.ffi.new("int32_t*",0)
p_times=self.ffi.new("uint64_t[]",512)
times=self.ffi.new("uint64_t**",p_times)
duration=self.ffi.new("uint64_t*",0)
titles=self.ffi.new("int32_t*",0)
self.lib.dvdnav_get_number_of_titles(self.dvd,titles)
num_titles=titles[0]
for title in range(0,num_titles+1):
if self.lib.dvdnav_get_number_of_parts(self.dvd,title,titles)==0:
continue
num_parts=titles[0]
self.lib.dvdnav_get_number_of_angles(self.dvd,title,titles)
num_angles=titles[0]
num_chapters=self.lib.dvdnav_describe_title_chapters(self.dvd,title,times,duration)
if duration[0]==0:
continue
chapters=[]
for t in range(num_chapters):
chapters.append(timedelta(seconds=times[0][t]/90000))
self.titles[title]={
'parts':num_parts,
'angles': num_angles,
'duration': timedelta(seconds=duration[0]/90000),
'chapters': chapters
}
def __get_info(self):
s=self.ffi.new("char**",self.ffi.NULL)
self.lib.dvdnav_get_title_string(self.dvd,s)
self.title=str(self.ffi.string(s[0]),"utf8").strip() or None
self.lib.dvdnav_get_serial_string(self.dvd,s)
self.serial=str(self.ffi.string(s[0]),"utf8").strip() or None
self.__get_titles()
def open(self,path):
audio_attrs=self.ffi.new("audio_attr_t*")
spu_attr=self.ffi.new("subp_attr_t*")
dvdnav=self.ffi.new("dvdnav_t**",self.ffi.cast("dvdnav_t*",0))
self.__check_error(self.lib.dvdnav_open(dvdnav,bytes(path,"utf8")))
self.dvd=dvdnav[0]
self.__check_error(self.lib.dvdnav_set_readahead_flag(self.dvd,1))
self.__get_info()
for title in self.titles:
self.__check_error(self.lib.dvdnav_title_play(self.dvd,title))
self.titles[title]['audio']={}
self.titles[title]['subtitles']={}
for n in range(255):
stream_id=self.lib.dvdnav_get_audio_logical_stream(self.dvd,n)
if stream_id==-1:
continue
self.__check_error(self.lib.dvdnav_get_audio_attr(self.dvd,stream_id,audio_attrs))
alang=None
if audio_attrs.lang_type:
alang = str(audio_attrs.lang_code.to_bytes(2,'big'),"utf8")
channels = audio_attrs.channels+1
codec = {
0: 'ac3',
2: 'mpeg1',
3: 'mpeg-2ext',
4: 'lpcm',
6: 'dts'
}[audio_attrs.audio_format]
audio_type={
0: None,
1: 'normal',
2: 'descriptive',
3: "director's commentary",
4: "alternate director's commentary"
}[audio_attrs.code_extension]
self.titles[title]['audio'][n]={
'stream_id': stream_id,
'lang':alang,
'channels': channels,
'codec': codec,
'type': audio_type
}
for n in range(255):
stream_id=self.lib.dvdnav_get_spu_logical_stream(self.dvd,n)
if stream_id==-1:
continue
self.__check_error(self.lib.dvdnav_get_spu_attr(self.dvd,stream_id,spu_attr))
slang = None
if spu_attr.type==1:
slang = str(spu_attr.lang_code.to_bytes(2,'big'),"utf8")
self.titles[title]['subtitles'][n]={
'stream_id': stream_id,
'lang':slang
}
self.__check_error(self.lib.dvdnav_stop(self.dvd))