integrate home-manager config

This commit is contained in:
jaina heartles 2024-07-21 22:36:29 -07:00
parent a470338c43
commit 826a0e5525
45 changed files with 5308 additions and 1 deletions

View file

@ -0,0 +1,18 @@
import os
import json
CONFIG_FILE = os.path.join(os.getenv('XDG_CONFIG_HOME'), 'ptray', 'config.json')
DESKTOP = os.getenv('XDG_CURRENT_DESKTOP')
if DESKTOP == 'i3' or DESKTOP == 'sway':
STATE_DIR = '{}.{}'.format(os.getenv('I3SOCK'), 'ptray')
elif DESKTOP == "hyprland":
STATE_DIR = '{}/{}.ptray'.format(os.getenv("XDG_RUNTIME_DIR"), os.getenv("HYPRLAND_INSTANCE_SIGNATURE"))
else:
raise ValueError('Unsupported value of XDG_CURRENT_DESKTOP: {}'.format(DESKTOP))
def read_config(path=CONFIG_FILE):
with open(path, 'r') as f:
config = json.load(f)
return config

View file

@ -0,0 +1,117 @@
import os
import socket
import json
from typing import NamedTuple, Any
HYPR = "/tmp/hypr/{}".format(os.getenv("HYPRLAND_INSTANCE_SIGNATURE"))
COMMANDSOCK = "{}/.socket.sock".format(HYPR)
EVENTSOCK = "{}/.socket2.sock".format(HYPR)
BUF_SIZE = 2 ** 20
class EventType(Enum):
WORKSPACE = "workspace"
FOCUSED_MON = "focusedmon"
ACTIVE_WINDOW = "activewindow"
ACTIVE_WINDOW_V2 = "activewindowv2"
FULLSCREEN = "fullscreen"
MONITOR_REMOVED = "monitorremoved"
MONITOR_ADDED = "monitoradded"
CREATE_WORKSPACE = "createworkspace"
DESTROY_WORKSPACE = "destroyworkspace"
MOVE_WORKSPACE = "moveworkspace"
ACTIVE_LAYOUT = "activelayout"
OPEN_WINDOW = "openwindow"
CLOSE_WINDOW = "closewindow"
MOVE_WINDOW = "movewindow"
OPEN_LAYER = "openlayer"
CLOSE_LAYER = "closelayer"
SUBMAP = "submap"
CHANGE_FLOATING_MODE = "changefloatingmode"
URGENT = "urgent"
MINIMIZE = "minimize"
SCREENCAST = "screencast"
WINDOW_TITLE = "windowtitle"
class Event(NamedTuple):
type : EventType
value : str
def subscribe(sockfile=EVENTSOCK):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_CLOEXEC)
sock.connect(sockfile)
while True:
msg = sock.recvmsg(BUF_SIZE)[0]
type, value = msg.split('>>', 1)
return Event(type=type, value=value)
def hyprctl(cmd, sockfile=COMMANDSOCK):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_CLOEXEC)
try:
sock.connect(sockfile)
header = 'j/'.encode('utf-8')
payload = cmd.encode('utf-8')
sock.sendmsg([header, payload])
msg = sock.recvmsg(BUF_SIZE)[0]
finally:
sock.close()
return json.loads(msg)
print(json.dumps(hyprctl('clients')))
class Monitor:
event_handler = None
tracked_classes = None
window_ids = None
def __init__(self, event_handler, tracked_classes):
self.event_handler = event_handler
self.tracked_classes = tracked_classes
self.window_ids = {name: set() for name in tracked_classes}
windows = hyprctl('clients')
monitors = hyprctl('monitors')
open_workspaces = [m['specialWorkspace']['id'] for m in monitors]
for window in windows:
for name: classes in tracked_classes.items():
for cls in classes:
if cls in window['class']:
id = window['address']
self.event_handler.on_create(name, id)
self.window_ids[name].add(id)
workspace_id = window['workspace']['id']
if workspace_id < 0:
self.event_handler.on_float(name, id)
if workspace_id in open_workspaces:
self.event_handler.on_visible(name, id)
else:
self.event_handler.on_invisible(name, id)
else:
self.event_handler.on_tile(name, id)
self.event_handler.on_visible(name, id)
def listen(self):
for event in subscribe():
self.handle_event(event)
def match_window_id_to_name(self, id):
for name, ids in self.window_ids.items():
if id in ids:
return name
return None
def handle_event(self, event):
if event == EventType.MOVE_WINDOW:
id, workspace = event.split(',', 1)
if workspace.startsWith('special'):
self.even
movewindow>>2700e80,special:3

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,30 @@
from i3sway.protocol import Connection, ConnType
class Client:
conn = None
def __init__(self):
self.conn = Connection()
def __enter__(self):
return self
def __exit__(self, *details):
self.conn.close()
def cmd_on_window(self, id, cmd):
if self.conn.type == ConnType.SWAY:
criteria = '[con_id={}]'.format(id)
else:
criteria = '[id={}]'.format(id)
return self.conn.exec('{} {}'.format(criteria, cmd))
def show_window(self, id):
self.cmd_on_window(id, "focus")
def hide_window(self, id):
self.cmd_on_window(id, "move to scratchpad")
def run(self, command):
self.conn.exec("exec {}".format(command))

View file

@ -0,0 +1,112 @@
from i3sway.protocol import Connection, MessageType
class Monitor:
tracked_classes = None
cmd_sock = None
subscription_sock = None
event_handler = None
conn_type = None
def __enter__(self):
return self
def __exit__(self, *details):
self.cmd_sock.close()
self.subscription_sock.close()
def __init__(self, event_handler, tracked_classes):
self.event_handler = event_handler
self.tracked_classes = tracked_classes
self.cmd_sock = Connection()
self.subscription_sock = Connection()
tree = self.cmd_sock.request(MessageType.GET_TREE).payload
def parse_tree(root):
if root.get('window') is not None or root.get('app_id') is not None:
# Application window
for name, classes in tracked_classes.items():
if self.container_matches_classes(root, classes):
id = str(root['id'])
self.event_handler.on_create(name, id)
if root['visible']:
self.event_handler.on_visible(name, id)
if root['type'] == 'floating_con':
self.event_handler.on_float(name, id)
elif root['type'] == 'con':
self.event_handler.on_tile(name, id)
else:
for node in root['nodes']:
parse_tree(node)
for node in root['floating_nodes']:
parse_tree(node)
parse_tree(tree)
def container_matches_classes(self, container, classes):
normalize = lambda v: v.lower() if v is not None else ''
app_id = container.get('app_id')
window_props = container.get('window_properties')
instance = window_props.get('instance') if window_props is not None else None
window_class = window_props.get('class') if window_props is not None else None
app_id = normalize(app_id)
instance = normalize(instance)
window_class = normalize(window_class)
for keyword in classes:
kw = normalize(keyword)
if kw in app_id or kw in instance or kw in window_class:
return True
return False
def listen(self):
for event in self.subscription_sock.subscribe(['window']):
self.handle_event(event)
def is_window_in_scratchpad(self, id):
def scan_tree(root, is_in_scratch):
if root['id'] == id:
return is_in_scratch
is_scratch = is_in_scratch or (
root['type'] == 'workspace' and root['name'] == '__i3_scratch'
)
for node in root['nodes'] + root['floating_nodes']:
result = scan_tree(node, is_scratch)
if result is not None:
return result
tree = self.cmd_sock.request(MessageType.GET_TREE)
return scan_tree(tree.payload, False)
def handle_event(self, event):
if event.type == MessageType.window:
return self.handle_window_event(event)
def handle_window_event(self, event):
container = event['container']
for name, classes in self.tracked_classes.items():
change = event['change']
if self.container_matches_classes(container, classes):
id = str(container['id'])
if change == 'new':
self.event_handler.on_create(name, id)
elif change == 'focus':
self.event_handler.on_visible(name, id)
elif change == 'close':
self.event_handler.on_destroy(name, id)
elif change == 'move':
if container['visible']:
self.event_handler.on_visible(name, id)
else:
self.event_handler.on_invisible(name, id)
elif change == 'floating':
if container['type'] == 'floating_con':
self.event_handler.on_float(name, id)
elif container['type'] == 'con':
self.event_handler.on_tile(name, id)

View file

@ -0,0 +1,133 @@
import json
import socket
import struct
import os
from enum import Enum, IntEnum
from typing import NamedTuple, Any
class ConnType(Enum):
SWAY = 'sway'
I3 = 'i3'
SWAYSOCK = os.getenv('SWAYSOCK')
I3SOCK = os.getenv('I3SOCK')
class MessageType(IntEnum):
# Commands / Replies
RUN_COMMAND = 0
GET_WORKSPACES = 1
SUBSCRIBE = 2
GET_OUTPUTS = 3
GET_TREE = 4
GET_MARKS = 5
GET_BAR_CONFIG = 6
GET_VERSION = 7
GET_BINDING_NODES = 8
GET_CONFIG = 9
SEND_TICK = 10
SYNC = 11
GET_BINDING_STATE = 12
GET_INPUTS = 100
GET_SEATS = 101
# Events
workspace = 0x80000000
mode = 0x80000002
window = 0x80000003
barconfig_update = 0x80000004
binding = 0x80000005
shutdown = 0x80000006
tick = 0x80000007
bar_state_update = 0x80000014
input = 0x80000015
class Message(NamedTuple):
type : MessageType
payload : Any
MAGIC = 'i3-ipc'.encode('utf-8')
HEADER_FORMAT = '=6sII'
HEADER_LEN = struct.calcsize(HEADER_FORMAT)
def accept(sock):
header = sock.recvmsg(Message.HEADER_LEN)[0]
magic, len, type = struct.unpack(Message.HEADER_FORMAT, header)
if magic != Message.MAGIC:
raise ValueError('Protocol error, expected magic value {}, got magic value {}' % (Message.MAGIC, magic))
payload_buf = sock.recvmsg(len)[0]
payload = json.loads(payload_buf)
return Message(type=type, payload=payload)
def send(self, sock):
if self.payload is None:
payload_buf = bytes([])
elif isinstance(self.payload, str):
payload_buf = self.payload.encode('utf-8')
else:
payload_buf = json.dumps(self.payload).encode('utf-8')
payload_len = len(payload_buf)
header = struct.pack(Message.HEADER_FORMAT, Message.MAGIC, payload_len, self.type)
sock.sendmsg([header, payload_buf])
def __getitem__(self, key):
return self.payload[key]
class Connection:
sock: socket.socket
type: ConnType
subscription_open: bool = False
def __init__(self, type=None, sockfile=None):
self.type = type
if sockfile is None:
if type == ConnType.SWAY or (type is None and SWAYSOCK is not None):
sockfile = SWAYSOCK
self.type = ConnType.SWAY
elif type == ConnType.I3 or (type is None and I3SOCK is not None):
sockfile = I3SOCK
self.type = ConnType.I3
else:
raise ValueError('No compatible window managers found')
self.sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM,# | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC,
)
self.sock.connect(sockfile)
def __enter__(self):
return self
def __exit__(self, *details):
self.close()
def close(self):
self.sock.close()
def subscribe(self, events: list):
if self.subscription_open:
raise ValueError('Subscription already open on socket')
reply = self.request(MessageType.SUBSCRIBE, events)
if not reply['success']:
raise ValueError('Subscription failed')
self.subscription_open = True
while True:
yield Message.accept(self.sock)
def exec(self, command: str):
return self.request(MessageType.RUN_COMMAND, command)
def request(self, type: MessageType, payload: Any = None) -> Message:
if self.subscription_open:
raise ValueError('Subscription open on socket')
message = Message(type=type, payload=payload)
message.send(self.sock)
reply = Message.accept(self.sock)
if reply.type != type:
print('Incorrect reply type', reply.type)
return reply

23
home-manager/jwm/ptray/ptray Executable file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env python
from i3sway.monitor import Monitor
from config import read_config
from state import StateWriter
"""
{
"items": {
"discord": {
"match_class": [ "discord" "WebCord" ],
"exec": "webcord",
},
},
}
"""
config = read_config()
state = StateWriter(config)
class_map = {name: defn['match_classes'] for name, defn in config['items'].items()}
with Monitor(state, class_map) as wm:
wm.listen()

96
home-manager/jwm/ptray/ptrayctl Executable file
View file

@ -0,0 +1,96 @@
#!/usr/bin/env python
import os
import argparse
import inotify.adapters
from i3sway.commands import Client
from config import read_config
from state import open_file, format_path
parser = argparse.ArgumentParser(
prog='ptrayctl',
description='manages user pseudo-tray',
)
parser.add_argument('command', choices=['show', 'hide', 'watch', 'toggle'])
parser.add_argument('name', nargs='?')
parser.add_argument('-a', '--all', action='store_true', help='Include windows that are not floating')
args = parser.parse_args()
config = read_config()
def show_window(name):
with Client() as client:
is_open = False
with open_file(name, 'window_ids') as f:
for line in f:
is_open = True
client.show_window(int(line))
if not is_open:
with open_file(name, 'exec') as f:
client.run(f.read())
def hide_window(name):
if not args.all:
with open_file(name, 'tiled_ids') as f:
ignore_ids = f.read().split('\n')
else:
ignore_ids = []
with Client() as client:
with open_file(name, 'window_ids') as f:
for line in f:
if line not in ignore_ids:
client.hide_window(int(line))
def hide_all():
for name in config['items']:
hide_window(name)
def toggle_window(name):
with open_file(name, 'status') as f:
status = f.read()
if status == 'fg':
hide_window(name)
else:
show_window(name)
def watch_window(name):
i = inotify.adapters.Inotify()
i.add_watch(format_path(name, ''))
def read_status():
with open_file(name, 'status') as f:
return f.read()
status = read_status()
print(status, flush=True)
for event in i.event_gen(yield_nones=False):
(_, type_names, _, filename) = event
if 'IN_CLOSE_WRITE' in type_names and filename == 'status':
new_status = read_status()
if status != new_status:
print(new_status, flush=True)
status = new_status
if args.command == 'show':
if args.name is None:
raise ValueError('name must be specified for show commands')
show_window(args.name)
elif args.command == 'hide':
if args.name is None:
hide_all()
else:
hide_window(args.name)
elif args.command == 'toggle':
if args.name is None:
raise ValueError('name must be specified for toggle commands')
toggle_window(args.name)
elif args.command == 'watch':
if args.name is None:
raise ValueError('name must be specified for watch commands')
watch_window(args.name)

View file

@ -0,0 +1 @@
inotify

View file

@ -0,0 +1,10 @@
#!/usr/bin/env python
from distutils.core import setup
setup(
name='ptray',
version='0.1',
scripts=[ 'ptray', 'ptrayctl' ],
packages=['', 'i3sway'],
)

View file

@ -0,0 +1,85 @@
import os
from config import STATE_DIR
def format_path(name, property):
return '{}/{}/{}'.format(STATE_DIR, name, property)
def write_file(dir, basename, content):
os.makedirs('{}/{}'.format(STATE_DIR, dir), exist_ok=True)
path = format_path(dir, basename)
with open(path, 'w') as f:
f.write(content)
def open_file(dir, basename):
path = format_path(dir, basename)
return open(path, 'r')
def add_window(tracking: dict, name, window_id: str):
new_dict = tracking.copy()
new_dict[name] = new_dict.get(name, set()) | {window_id}
return new_dict
def remove_window(tracking: dict, name, window_id: str):
new_dict = tracking.copy()
new_dict[name] = new_dict.get(name, set()) - {window_id}
return new_dict
def read_config(path):
with open(path, 'r') as f:
config = json.load(f)
return config
class StateWriter:
tracked_ids = None
visible_ids = None
config = None
def __init__(self, config):
self.config = config
self.tracked_ids = {name: set() for name in config['items'].keys()}
self.visible_ids = {name: set() for name in config['items'].keys()}
self.tiled_ids = {name: set() for name in config['items'].keys()}
for name, defn in config['items'].items():
write_file(name, 'exec', defn['exec'])
write_file(name, 'status', '')
write_file(name, 'window_ids', '')
def update_window_state(self, name):
if len(self.visible_ids[name]) != 0:
write_file(name, 'status', 'fg')
elif len(self.tracked_ids[name]) != 0:
write_file(name, 'status', 'bg')
else:
write_file(name, 'status', '')
write_file(name, 'window_ids', '\n'.join(self.tracked_ids[name]))
write_file(name, 'tiled_ids', '\n'.join(self.tiled_ids[name]))
def on_visible(self, item_name, window_id, tiled=False):
self.visible_ids[item_name] = self.visible_ids[item_name] | {window_id}
self.update_window_state(item_name)
def on_invisible(self, item_name, window_id):
self.visible_ids[item_name] = self.visible_ids[item_name] - {window_id}
self.update_window_state(item_name)
def on_create(self, item_name, window_id):
self.tracked_ids[item_name] = self.tracked_ids[item_name] | {window_id}
self.update_window_state(item_name)
def on_destroy(self, item_name, window_id):
self.tracked_ids[item_name] = self.tracked_ids[item_name] - {window_id}
self.visible_ids[item_name] = self.visible_ids[item_name] - {window_id}
self.tiled_ids[item_name] = self.tiled_ids[item_name] - {window_id}
self.update_window_state(item_name)
def on_tile(self, item_name, window_id):
self.tiled_ids[item_name] = self.tiled_ids[item_name] | {window_id}
self.update_window_state(item_name)
def on_float(self, item_name, window_id):
self.tiled_ids[item_name] = self.tiled_ids[item_name] - {window_id}
self.update_window_state(item_name)