import json import socket import struct import os from enum import Enum, IntEnum from typing import NamedTuple, Any class ConnType(Enum): SWAY = 'sway' I3 = 'i3' SWAYSOCK = os.getenv('SWAYSOCK') I3SOCK = os.getenv('I3SOCK') class MessageType(IntEnum): # Commands / Replies RUN_COMMAND = 0 GET_WORKSPACES = 1 SUBSCRIBE = 2 GET_OUTPUTS = 3 GET_TREE = 4 GET_MARKS = 5 GET_BAR_CONFIG = 6 GET_VERSION = 7 GET_BINDING_NODES = 8 GET_CONFIG = 9 SEND_TICK = 10 SYNC = 11 GET_BINDING_STATE = 12 GET_INPUTS = 100 GET_SEATS = 101 # Events workspace = 0x80000000 mode = 0x80000002 window = 0x80000003 barconfig_update = 0x80000004 binding = 0x80000005 shutdown = 0x80000006 tick = 0x80000007 bar_state_update = 0x80000014 input = 0x80000015 class Message(NamedTuple): type : MessageType payload : Any MAGIC = 'i3-ipc'.encode('utf-8') HEADER_FORMAT = '=6sII' HEADER_LEN = struct.calcsize(HEADER_FORMAT) def accept(sock): header = sock.recvmsg(Message.HEADER_LEN)[0] magic, len, type = struct.unpack(Message.HEADER_FORMAT, header) if magic != Message.MAGIC: raise ValueError('Protocol error, expected magic value {}, got magic value {}' % (Message.MAGIC, magic)) payload_buf = sock.recvmsg(len)[0] payload = json.loads(payload_buf) return Message(type=type, payload=payload) def send(self, sock): if self.payload is None: payload_buf = bytes([]) elif isinstance(self.payload, str): payload_buf = self.payload.encode('utf-8') else: payload_buf = json.dumps(self.payload).encode('utf-8') payload_len = len(payload_buf) header = struct.pack(Message.HEADER_FORMAT, Message.MAGIC, payload_len, self.type) sock.sendmsg([header, payload_buf]) def __getitem__(self, key): return self.payload[key] class Connection: sock: socket.socket type: ConnType subscription_open: bool = False def __init__(self, type=None, sockfile=None): self.type = type if sockfile is None: if type == ConnType.SWAY or (type is None and SWAYSOCK is not None): sockfile = SWAYSOCK self.type = ConnType.SWAY elif type == ConnType.I3 or (type is None and I3SOCK is not None): sockfile = I3SOCK self.type = ConnType.I3 else: raise ValueError('No compatible window managers found') self.sock = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM,# | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC, ) self.sock.connect(sockfile) def __enter__(self): return self def __exit__(self, *details): self.close() def close(self): self.sock.close() def subscribe(self, events: list): if self.subscription_open: raise ValueError('Subscription already open on socket') reply = self.request(MessageType.SUBSCRIBE, events) if not reply['success']: raise ValueError('Subscription failed') self.subscription_open = True while True: yield Message.accept(self.sock) def exec(self, command: str): return self.request(MessageType.RUN_COMMAND, command) def request(self, type: MessageType, payload: Any = None) -> Message: if self.subscription_open: raise ValueError('Subscription open on socket') message = Message(type=type, payload=payload) message.send(self.sock) reply = Message.accept(self.sock) if reply.type != type: print('Incorrect reply type', reply.type) return reply