133 lines
3.9 KiB
Python
133 lines
3.9 KiB
Python
import json
|
|
import socket
|
|
import struct
|
|
import os
|
|
|
|
from enum import Enum, IntEnum
|
|
from typing import NamedTuple, Any
|
|
|
|
class ConnType(Enum):
|
|
SWAY = 'sway'
|
|
I3 = 'i3'
|
|
|
|
SWAYSOCK = os.getenv('SWAYSOCK')
|
|
I3SOCK = os.getenv('I3SOCK')
|
|
|
|
class MessageType(IntEnum):
|
|
# Commands / Replies
|
|
RUN_COMMAND = 0
|
|
GET_WORKSPACES = 1
|
|
SUBSCRIBE = 2
|
|
GET_OUTPUTS = 3
|
|
GET_TREE = 4
|
|
GET_MARKS = 5
|
|
GET_BAR_CONFIG = 6
|
|
GET_VERSION = 7
|
|
GET_BINDING_NODES = 8
|
|
GET_CONFIG = 9
|
|
SEND_TICK = 10
|
|
SYNC = 11
|
|
GET_BINDING_STATE = 12
|
|
GET_INPUTS = 100
|
|
GET_SEATS = 101
|
|
|
|
# Events
|
|
workspace = 0x80000000
|
|
mode = 0x80000002
|
|
window = 0x80000003
|
|
barconfig_update = 0x80000004
|
|
binding = 0x80000005
|
|
shutdown = 0x80000006
|
|
tick = 0x80000007
|
|
bar_state_update = 0x80000014
|
|
input = 0x80000015
|
|
|
|
class Message(NamedTuple):
|
|
type : MessageType
|
|
payload : Any
|
|
|
|
MAGIC = 'i3-ipc'.encode('utf-8')
|
|
HEADER_FORMAT = '=6sII'
|
|
HEADER_LEN = struct.calcsize(HEADER_FORMAT)
|
|
|
|
def accept(sock):
|
|
header = sock.recvmsg(Message.HEADER_LEN)[0]
|
|
magic, len, type = struct.unpack(Message.HEADER_FORMAT, header)
|
|
if magic != Message.MAGIC:
|
|
raise ValueError('Protocol error, expected magic value {}, got magic value {}' % (Message.MAGIC, magic))
|
|
|
|
payload_buf = sock.recvmsg(len)[0]
|
|
payload = json.loads(payload_buf)
|
|
|
|
return Message(type=type, payload=payload)
|
|
|
|
def send(self, sock):
|
|
if self.payload is None:
|
|
payload_buf = bytes([])
|
|
elif isinstance(self.payload, str):
|
|
payload_buf = self.payload.encode('utf-8')
|
|
else:
|
|
payload_buf = json.dumps(self.payload).encode('utf-8')
|
|
payload_len = len(payload_buf)
|
|
|
|
header = struct.pack(Message.HEADER_FORMAT, Message.MAGIC, payload_len, self.type)
|
|
sock.sendmsg([header, payload_buf])
|
|
|
|
def __getitem__(self, key):
|
|
return self.payload[key]
|
|
|
|
class Connection:
|
|
sock: socket.socket
|
|
type: ConnType
|
|
subscription_open: bool = False
|
|
|
|
def __init__(self, type=None, sockfile=None):
|
|
self.type = type
|
|
if sockfile is None:
|
|
if type == ConnType.SWAY or (type is None and SWAYSOCK is not None):
|
|
sockfile = SWAYSOCK
|
|
self.type = ConnType.SWAY
|
|
elif type == ConnType.I3 or (type is None and I3SOCK is not None):
|
|
sockfile = I3SOCK
|
|
self.type = ConnType.I3
|
|
else:
|
|
raise ValueError('No compatible window managers found')
|
|
|
|
self.sock = socket.socket(
|
|
socket.AF_UNIX,
|
|
socket.SOCK_STREAM,# | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC,
|
|
)
|
|
self.sock.connect(sockfile)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
def __exit__(self, *details):
|
|
self.close()
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
def subscribe(self, events: list):
|
|
if self.subscription_open:
|
|
raise ValueError('Subscription already open on socket')
|
|
|
|
reply = self.request(MessageType.SUBSCRIBE, events)
|
|
if not reply['success']:
|
|
raise ValueError('Subscription failed')
|
|
|
|
self.subscription_open = True
|
|
while True:
|
|
yield Message.accept(self.sock)
|
|
|
|
def exec(self, command: str):
|
|
return self.request(MessageType.RUN_COMMAND, command)
|
|
|
|
def request(self, type: MessageType, payload: Any = None) -> Message:
|
|
if self.subscription_open:
|
|
raise ValueError('Subscription open on socket')
|
|
message = Message(type=type, payload=payload)
|
|
message.send(self.sock)
|
|
reply = Message.accept(self.sock)
|
|
if reply.type != type:
|
|
print('Incorrect reply type', reply.type)
|
|
return reply
|