diff --git a/dance.py b/dance.py index 346b98e..3a78c21 100644 --- a/dance.py +++ b/dance.py @@ -1,3 +1,36 @@ +import os + +def check_dependencies(): + def check_dependency_aiohttp(): + while True: + try: + import aiohttp + except: + os.system('pip3 install aiohttp==3.7.4') + else: + break + + def check_dependency_hidapi(): + while True: + try: + import hid + except: + os.system('pip3 install hidapi'); os.system('pip3 install hid') + else: + break + + def check_dependency_websockets(): + while True: + try: + import websockets + except: + os.system('pip3 install websockets') + else: + break + + check_dependency_aiohttp(); check_dependency_hidapi(); check_dependency_websockets() + + import asyncio import json import logging @@ -13,333 +46,340 @@ import hid from aiohttp import WSMsgType, web from joydance import JoyDance, PairingState -from joydance.constants import (DEFAULT_CONFIG, JOYDANCE_VERSION, - WsSubprotocolVersion) +from joydance.constants import (DEFAULT_CONFIG, JOYDANCE_VERSION, WsSubprotocolVersion) from pycon import ButtonEventJoyCon, JoyCon from pycon.constants import JOYCON_PRODUCT_IDS, JOYCON_VENDOR_ID logging.getLogger('asyncio').setLevel(logging.WARNING) - -class WsCommand(Enum): - GET_JOYCON_LIST = 'get_joycon_list' - CONNECT_JOYCON = 'connect_joycon' - DISCONNECT_JOYCON = 'disconnect_joycon' - UPDATE_JOYCON_STATE = 'update_joycon_state' - - -class PairingMethod(Enum): - DEFAULT = 'default' - FAST = 'fast' - STADIA = 'stadia' - OLD = 'old' - - -REGEX_PAIRING_CODE = re.compile(r'^\d{6}$') -REGEX_LOCAL_IP_ADDRESS = re.compile(r'^(192\.168|10.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5]))\.((\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.)(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])$') - - -async def get_device_ids(): - devices = hid.enumerate(JOYCON_VENDOR_ID, 0) - - out = [] - for device in devices: - vendor_id = device['vendor_id'] - product_id = device['product_id'] - product_string = device['product_string'] - serial = device.get('serial') or device.get('serial_number') - - if product_id not in JOYCON_PRODUCT_IDS: - continue - - if not product_string: - continue - - out.append({ - 'vendor_id': vendor_id, - 'product_id': product_id, - 'serial': serial, - 'product_string': product_string, - }) - - return out - - -async def get_joycon_list(app): - joycons = [] - devices = await get_device_ids() - - for dev in devices: - if dev['serial'] in app['joycons_info']: - info = app['joycons_info'][dev['serial']] - else: - joycon = JoyCon(dev['vendor_id'], dev['product_id'], dev['serial']) - # Wait for initial data - for _ in range(3): - time.sleep(0.05) - battery_level = joycon.get_battery_level() - if battery_level > 0: - break - - color = '#%02x%02x%02x' % joycon.color_body - - # Temporary fix for Windows - if platform.system() != 'Windows': - joycon.__del__() - - info = { - 'vendor_id': dev['vendor_id'], - 'product_id': dev['product_id'], - 'serial': dev['serial'], - 'name': dev['product_string'], - 'color': color, - 'battery_level': battery_level, - 'is_left': joycon.is_left(), - 'state': PairingState.IDLE.value, - 'pairing_code': '', - } - - app['joycons_info'][dev['serial']] = info - - joycons.append(info) - - return sorted(joycons, key=lambda x: (x['name'], x['color'], x['serial'])) - - -async def connect_joycon(app, ws, data): - async def on_joydance_state_changed(serial, state): - print(serial, state) - app['joycons_info'][serial]['state'] = state.value - try: - await ws_send_response(ws, WsCommand.UPDATE_JOYCON_STATE, app['joycons_info'][serial]) - except Exception as e: - print(e) - - print(data) - - serial = data['joycon_serial'] - product_id = app['joycons_info'][serial]['product_id'] - vendor_id = app['joycons_info'][serial]['vendor_id'] - - pairing_method = data['pairing_method'] - host_ip_addr = data['host_ip_addr'] - console_ip_addr = data['console_ip_addr'] - pairing_code = data['pairing_code'] - - if not is_valid_pairing_method(pairing_method): - return - - if pairing_method == PairingMethod.DEFAULT.value: - if not is_valid_ip_address(host_ip_addr) or not is_valid_pairing_code(pairing_code): - return - - if pairing_method == PairingMethod.FAST.value and not is_valid_ip_address(console_ip_addr): - return - - config_parser = parse_config() - config = dict(config_parser.items('joydance')) - config['pairing_code'] = pairing_code - config['pairing_method'] = pairing_method - config['host_ip_addr'] = host_ip_addr - config['console_ip_addr'] = console_ip_addr - config_parser['joydance'] = config - save_config(config_parser) - - if pairing_method == PairingMethod.DEFAULT.value or pairing_method == PairingMethod.STADIA.value: - app['joycons_info'][serial]['pairing_code'] = pairing_code - console_ip_addr = None - else: - app['joycons_info'][serial]['pairing_code'] = '' - - joycon = ButtonEventJoyCon(vendor_id, product_id, serial) - - if pairing_method == PairingMethod.OLD.value: - protocol_version = WsSubprotocolVersion.V1 - else: - protocol_version = WsSubprotocolVersion.V2 - - joydance = JoyDance( - joycon, - protocol_version=protocol_version, - pairing_code=pairing_code, - host_ip_addr=host_ip_addr, - console_ip_addr=console_ip_addr, - on_state_changed=on_joydance_state_changed, - ) - app['joydance_connections'][serial] = joydance - - asyncio.create_task(joydance.pair()) - - -async def disconnect_joycon(app, ws, data): - print(data) - serial = data['joycon_serial'] - joydance = app['joydance_connections'][serial] - app['joycons_info'][serial]['state'] = PairingState.IDLE.value - - await joydance.disconnect() +def main(): try: - await ws_send_response(ws, WsCommand.UPDATE_JOYCON_STATE, { - 'joycon_serial': serial, - 'state': PairingState.IDLE.value, - }) - except Exception: - pass + class WsCommand(Enum): + GET_JOYCON_LIST = 'get_joycon_list' + CONNECT_JOYCON = 'connect_joycon' + DISCONNECT_JOYCON = 'disconnect_joycon' + UPDATE_JOYCON_STATE = 'update_joycon_state' -def parse_config(): - parser = ConfigParser() - parser.read('config.cfg') - - if 'joydance' not in parser: - parser['joydance'] = DEFAULT_CONFIG - else: - tmp_config = DEFAULT_CONFIG.copy() - for key in tmp_config: - if key in parser['joydance']: - val = parser['joydance'][key] - if key == 'pairing_method': - if not is_valid_pairing_method(val): - val = PairingMethod.DEFAULT.value - elif key == 'host_ip_addr' or key == 'console_ip_addr': - if not(is_valid_ip_address(val)): - val = '' - elif key == 'pairing_code': - if not is_valid_pairing_code(val): - val = '' - elif key.startswith('accel_'): - try: - val = int(val) - except Exception: - val = DEFAULT_CONFIG[key] - - tmp_config[key] = val - - parser['joydance'] = tmp_config - - if not parser['joydance']['host_ip_addr']: - host_ip_addr = get_host_ip() - if host_ip_addr: - parser['joydance']['host_ip_addr'] = host_ip_addr - - save_config(parser) - return parser + class PairingMethod(Enum): + DEFAULT = 'default' + FAST = 'fast' + STADIA = 'stadia' + OLD = 'old' -def is_valid_pairing_code(val): - return re.match(REGEX_PAIRING_CODE, val) is not None + REGEX_PAIRING_CODE = re.compile(r'^\d{6}$') + REGEX_LOCAL_IP_ADDRESS = re.compile(r'^(192\.168|10.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5]))\.((\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.)(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])$') -def is_valid_ip_address(val): - return re.match(REGEX_LOCAL_IP_ADDRESS, val) is not None + async def get_device_ids(): + devices = hid.enumerate(JOYCON_VENDOR_ID, 0) + + out = [] + for device in devices: + vendor_id = device['vendor_id'] + product_id = device['product_id'] + product_string = device['product_string'] + serial = device.get('serial') or device.get('serial_number') + + if product_id not in JOYCON_PRODUCT_IDS: + continue + + if not product_string: + continue + + out.append({ + 'vendor_id': vendor_id, + 'product_id': product_id, + 'serial': serial, + 'product_string': product_string, + }) + + return out -def is_valid_pairing_method(val): - return val in [ - PairingMethod.DEFAULT.value, - PairingMethod.FAST.value, - PairingMethod.STADIA.value, - PairingMethod.OLD.value, - ] + async def get_joycon_list(app): + joycons = [] + devices = await get_device_ids() + + for dev in devices: + if dev['serial'] in app['joycons_info']: + info = app['joycons_info'][dev['serial']] + else: + joycon = JoyCon(dev['vendor_id'], dev['product_id'], dev['serial']) + # Wait for initial data + for _ in range(3): + time.sleep(0.05) + battery_level = joycon.get_battery_level() + if battery_level > 0: + break + + color = '#%02x%02x%02x' % joycon.color_body + + # Temporary fix for Windows + if platform.system() != 'Windows': + joycon.__del__() + + info = { + 'vendor_id': dev['vendor_id'], + 'product_id': dev['product_id'], + 'serial': dev['serial'], + 'name': dev['product_string'], + 'color': color, + 'battery_level': battery_level, + 'is_left': joycon.is_left(), + 'state': PairingState.IDLE.value, + 'pairing_code': '', + } + + app['joycons_info'][dev['serial']] = info + + joycons.append(info) + + return sorted(joycons, key=lambda x: (x['name'], x['color'], x['serial'])) -def get_host_ip(): - try: - for ip in socket.gethostbyname_ex(socket.gethostname())[2]: - if ip.startswith('192.168') or ip.startswith('10.'): - return ip - except Exception: - pass + async def connect_joycon(app, ws, data): + async def on_joydance_state_changed(serial, state): + print(serial, state) + app['joycons_info'][serial]['state'] = state.value + try: + await ws_send_response(ws, WsCommand.UPDATE_JOYCON_STATE, app['joycons_info'][serial]) + except Exception as e: + print(e) - return None + print(data) + + serial = data['joycon_serial'] + product_id = app['joycons_info'][serial]['product_id'] + vendor_id = app['joycons_info'][serial]['vendor_id'] + + pairing_method = data['pairing_method'] + host_ip_addr = data['host_ip_addr'] + console_ip_addr = data['console_ip_addr'] + pairing_code = data['pairing_code'] + + if not is_valid_pairing_method(pairing_method): + return + + if pairing_method == PairingMethod.DEFAULT.value: + if not is_valid_ip_address(host_ip_addr) or not is_valid_pairing_code(pairing_code): + return + + if pairing_method == PairingMethod.FAST.value and not is_valid_ip_address(console_ip_addr): + return + + config_parser = parse_config() + config = dict(config_parser.items('joydance')) + config['pairing_code'] = pairing_code + config['pairing_method'] = pairing_method + config['host_ip_addr'] = host_ip_addr + config['console_ip_addr'] = console_ip_addr + config_parser['joydance'] = config + save_config(config_parser) + + if pairing_method == PairingMethod.DEFAULT.value or pairing_method == PairingMethod.STADIA.value: + app['joycons_info'][serial]['pairing_code'] = pairing_code + console_ip_addr = None + else: + app['joycons_info'][serial]['pairing_code'] = '' + + joycon = ButtonEventJoyCon(vendor_id, product_id, serial) + + if pairing_method == PairingMethod.OLD.value: + protocol_version = WsSubprotocolVersion.V1 + else: + protocol_version = WsSubprotocolVersion.V2 + + joydance = JoyDance( + joycon, + protocol_version=protocol_version, + pairing_code=pairing_code, + host_ip_addr=host_ip_addr, + console_ip_addr=console_ip_addr, + on_state_changed=on_joydance_state_changed, + ) + app['joydance_connections'][serial] = joydance + + asyncio.create_task(joydance.pair()) -def save_config(parser): - with open('config.cfg', 'w') as fp: - parser.write(fp) + async def disconnect_joycon(app, ws, data): + print(data) + serial = data['joycon_serial'] + joydance = app['joydance_connections'][serial] + app['joycons_info'][serial]['state'] = PairingState.IDLE.value - -async def on_startup(app): - print(''' - ░░ ░░░░░░ ░░ ░░ ░░░░░░ ░░░░░ ░░░ ░░ ░░░░░░ ░░░░░░░ - ▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒▒▒ ▒▒ ▒▒ ▒▒ - ▒▒ ▒▒ ▒▒ ▒▒▒▒ ▒▒ ▒▒ ▒▒▒▒▒▒▒ ▒▒ ▒▒ ▒▒ ▒▒ ▒▒▒▒▒ -▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ ▓▓ - █████ ██████ ██ ██████ ██ ██ ██ ████ ██████ ███████ - -Open http://localhost:32623 in your browser.''') - - # Check for update - async with aiohttp.ClientSession() as session: - async with session.get('https://api.github.com/repos/redphx/joydance/releases/latest', ssl=False) as resp: - json_body = await resp.json() - latest_version = json_body['tag_name'][1:] - print('Running version {}.'.format(JOYDANCE_VERSION)) - if JOYDANCE_VERSION != latest_version: - print('\033[93m{}\033[00m'.format('Version {} is available: https://github.com/redphx/joydance'.format(latest_version))) - - -async def html_handler(request): - config = dict((parse_config()).items('joydance')) - with open('static/index.html', 'r') as f: - html = f.read() - html = html.replace('[[CONFIG]]', json.dumps(config)) - html = html.replace('[[VERSION]]', JOYDANCE_VERSION) - return web.Response(text=html, content_type='text/html') - - -async def ws_send_response(ws, cmd, data): - resp = { - 'cmd': 'resp_' + cmd.value, - 'data': data, - } - await ws.send_json(resp) - - -async def websocket_handler(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - async for msg in ws: - if msg.type == WSMsgType.TEXT: - msg = msg.json() + await joydance.disconnect() try: - cmd = WsCommand(msg['cmd']) - except ValueError: - print('Invalid cmd:', msg['cmd']) - continue - - if cmd == WsCommand.GET_JOYCON_LIST: - joycon_list = await get_joycon_list(request.app) - await ws_send_response(ws, cmd, joycon_list) - elif cmd == WsCommand.CONNECT_JOYCON: - await connect_joycon(request.app, ws, msg['data']) - await ws_send_response(ws, cmd, {}) - elif cmd == WsCommand.DISCONNECT_JOYCON: - await disconnect_joycon(request.app, ws, msg['data']) - await ws_send_response(ws, cmd, {}) - elif msg.type == WSMsgType.ERROR: - print('ws connection closed with exception %s' % - ws.exception()) - - return ws + await ws_send_response(ws, WsCommand.UPDATE_JOYCON_STATE, { + 'joycon_serial': serial, + 'state': PairingState.IDLE.value, + }) + except Exception: + pass -def favicon_handler(request): - return web.FileResponse('static/favicon.png') + def parse_config(): + parser = ConfigParser() + parser.read('config.cfg') + + if 'joydance' not in parser: + parser['joydance'] = DEFAULT_CONFIG + else: + tmp_config = DEFAULT_CONFIG.copy() + for key in tmp_config: + if key in parser['joydance']: + val = parser['joydance'][key] + if key == 'pairing_method': + if not is_valid_pairing_method(val): + val = PairingMethod.DEFAULT.value + elif key == 'host_ip_addr' or key == 'console_ip_addr': + if not(is_valid_ip_address(val)): + val = '' + elif key == 'pairing_code': + if not is_valid_pairing_code(val): + val = '' + elif key.startswith('accel_'): + try: + val = int(val) + except Exception: + val = DEFAULT_CONFIG[key] + + tmp_config[key] = val + + parser['joydance'] = tmp_config + + if not parser['joydance']['host_ip_addr']: + host_ip_addr = get_host_ip() + if host_ip_addr: + parser['joydance']['host_ip_addr'] = host_ip_addr + + save_config(parser) + return parser -app = web.Application() -app['joydance_connections'] = {} -app['joycons_info'] = {} + def is_valid_pairing_code(val): + return re.match(REGEX_PAIRING_CODE, val) is not None -app.on_startup.append(on_startup) -app.add_routes([ - web.get('/', html_handler), - web.get('/favicon.png', favicon_handler), - web.get('/ws', websocket_handler), - web.static('/css', 'static/css'), - web.static('/js', 'static/js'), -]) -web.run_app(app, port=32623) + def is_valid_ip_address(val): + return re.match(REGEX_LOCAL_IP_ADDRESS, val) is not None + + + def is_valid_pairing_method(val): + return val in [ + PairingMethod.DEFAULT.value, + PairingMethod.FAST.value, + PairingMethod.STADIA.value, + PairingMethod.OLD.value, + ] + + + def get_host_ip(): + try: + for ip in socket.gethostbyname_ex(socket.gethostname())[2]: + if ip.startswith('192.168') or ip.startswith('10.'): + return ip + except Exception: + pass + + return None + + + def save_config(parser): + with open('config.cfg', 'w') as fp: + parser.write(fp) + + + async def on_startup(app): + os.system('clear') + + # Check for update + async with aiohttp.ClientSession() as session: + async with session.get('https://api.github.com/repos/redphx/joydance/releases/latest', ssl=False) as resp: + json_body = await resp.json() + latest_version = json_body['tag_name'][1:] + print('JoyDance V{}.'.format(JOYDANCE_VERSION)) + if JOYDANCE_VERSION != latest_version: + print('\033[93m{}\033[00m'.format('Version {} is available: https://github.com/redphx/joydance'.format(latest_version))) + + time.sleep(2) + os.system('open "http://localhost:32623"') + + + async def html_handler(request): + config = dict((parse_config()).items('joydance')) + with open('static/index.html', 'r') as f: + html = f.read() + html = html.replace('[[CONFIG]]', json.dumps(config)) + html = html.replace('[[VERSION]]', JOYDANCE_VERSION) + return web.Response(text=html, content_type='text/html') + + + async def ws_send_response(ws, cmd, data): + resp = { + 'cmd': 'resp_' + cmd.value, + 'data': data, + } + await ws.send_json(resp) + + + async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + async for msg in ws: + if msg.type == WSMsgType.TEXT: + msg = msg.json() + try: + cmd = WsCommand(msg['cmd']) + except ValueError: + print('Invalid cmd:', msg['cmd']) + continue + + if cmd == WsCommand.GET_JOYCON_LIST: + joycon_list = await get_joycon_list(request.app) + await ws_send_response(ws, cmd, joycon_list) + elif cmd == WsCommand.CONNECT_JOYCON: + await connect_joycon(request.app, ws, msg['data']) + await ws_send_response(ws, cmd, {}) + elif cmd == WsCommand.DISCONNECT_JOYCON: + await disconnect_joycon(request.app, ws, msg['data']) + await ws_send_response(ws, cmd, {}) + elif msg.type == WSMsgType.ERROR: + print('ws connection closed with exception %s' % + ws.exception()) + + return ws + + + def favicon_handler(request): + return web.FileResponse('static/favicon.png') + + + app = web.Application() + app['joydance_connections'] = {} + app['joycons_info'] = {} + + app.on_startup.append(on_startup) + app.add_routes([ + web.get('/', html_handler), + web.get('/favicon.png', favicon_handler), + web.get('/ws', websocket_handler), + web.static('/css', 'static/css'), + web.static('/js', 'static/js'), + ]) + + web.run_app(app, port=32623) + + except KeyboardInterrupt: + os.system('clear') + return(0) + + except TypeError: + os.system('clear') + return(0) + +check_dependencies() +main()