nixos-flake/home-manager/jwm/ptray/i3sway/protocol.py

133 lines
3.9 KiB
Python

import json
import socket
import struct
import os
from enum import Enum, IntEnum
from typing import NamedTuple, Any
class ConnType(Enum):
SWAY = 'sway'
I3 = 'i3'
SWAYSOCK = os.getenv('SWAYSOCK')
I3SOCK = os.getenv('I3SOCK')
class MessageType(IntEnum):
# Commands / Replies
RUN_COMMAND = 0
GET_WORKSPACES = 1
SUBSCRIBE = 2
GET_OUTPUTS = 3
GET_TREE = 4
GET_MARKS = 5
GET_BAR_CONFIG = 6
GET_VERSION = 7
GET_BINDING_NODES = 8
GET_CONFIG = 9
SEND_TICK = 10
SYNC = 11
GET_BINDING_STATE = 12
GET_INPUTS = 100
GET_SEATS = 101
# Events
workspace = 0x80000000
mode = 0x80000002
window = 0x80000003
barconfig_update = 0x80000004
binding = 0x80000005
shutdown = 0x80000006
tick = 0x80000007
bar_state_update = 0x80000014
input = 0x80000015
class Message(NamedTuple):
type : MessageType
payload : Any
MAGIC = 'i3-ipc'.encode('utf-8')
HEADER_FORMAT = '=6sII'
HEADER_LEN = struct.calcsize(HEADER_FORMAT)
def accept(sock):
header = sock.recvmsg(Message.HEADER_LEN)[0]
magic, len, type = struct.unpack(Message.HEADER_FORMAT, header)
if magic != Message.MAGIC:
raise ValueError('Protocol error, expected magic value {}, got magic value {}' % (Message.MAGIC, magic))
payload_buf = sock.recvmsg(len)[0]
payload = json.loads(payload_buf)
return Message(type=type, payload=payload)
def send(self, sock):
if self.payload is None:
payload_buf = bytes([])
elif isinstance(self.payload, str):
payload_buf = self.payload.encode('utf-8')
else:
payload_buf = json.dumps(self.payload).encode('utf-8')
payload_len = len(payload_buf)
header = struct.pack(Message.HEADER_FORMAT, Message.MAGIC, payload_len, self.type)
sock.sendmsg([header, payload_buf])
def __getitem__(self, key):
return self.payload[key]
class Connection:
sock: socket.socket
type: ConnType
subscription_open: bool = False
def __init__(self, type=None, sockfile=None):
self.type = type
if sockfile is None:
if type == ConnType.SWAY or (type is None and SWAYSOCK is not None):
sockfile = SWAYSOCK
self.type = ConnType.SWAY
elif type == ConnType.I3 or (type is None and I3SOCK is not None):
sockfile = I3SOCK
self.type = ConnType.I3
else:
raise ValueError('No compatible window managers found')
self.sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM,# | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC,
)
self.sock.connect(sockfile)
def __enter__(self):
return self
def __exit__(self, *details):
self.close()
def close(self):
self.sock.close()
def subscribe(self, events: list):
if self.subscription_open:
raise ValueError('Subscription already open on socket')
reply = self.request(MessageType.SUBSCRIBE, events)
if not reply['success']:
raise ValueError('Subscription failed')
self.subscription_open = True
while True:
yield Message.accept(self.sock)
def exec(self, command: str):
return self.request(MessageType.RUN_COMMAND, command)
def request(self, type: MessageType, payload: Any = None) -> Message:
if self.subscription_open:
raise ValueError('Subscription open on socket')
message = Message(type=type, payload=payload)
message.send(self.sock)
reply = Message.accept(self.sock)
if reply.type != type:
print('Incorrect reply type', reply.type)
return reply