""" MemeD - the MEMEwork Daemon - This program manages the memework vps' command logging """ import asyncio import struct import json import logging import asyncpg import config logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__) db = None def parse_logstr(string): # '2015-02-11T19:05:10+00:00 labrat-1 snoopy[896]: [uid:0 sid:11679 # tty:/dev/pts/2 cwd:/root filename:/usr/bin/cat]: cat /etc/fstab.BAK' # I really need to parse the uid, cwd and the command out of that. splitted = string.split(':') command = splitted[-1].strip() k = string.find('[') important = string[string.find('[', k + 1):] lst = important.replace('[', '').replace(']', '').split() # filder uid and cwd s = [s.split(':') for s in lst if 'uid' in s or 'cwd' in s] uid = [e[1] for e in s if e[0] == 'uid'][0] cwd = [e[1] for e in s if e[0] == 'cwd'][0] return uid, cwd, command async def read_msg(reader): header = await reader.read(8) length, op = struct.unpack('Ii', header) data = await reader.read(length) data = data.decode() log.info('[recv] %d %d %s', length, op, data) return op, data async def read_payload(reader): op, message = await read_msg(reader) if op > 10: return op, json.loads(message) else: return op, message async def send_msg(writer, op: int, data: str): header = struct.pack('Ii', len(data), op) msg = f'{header.decode()}{data}'.encode() log.info('[send] %d, %s -> %r', op, data, msg) writer.write(msg) await writer.drain() async def process(op: int, message: str): if op == 1: uid, cwd, command = parse_logstr(message) await db.execute(""" INSERT INTO logs (uid, cwd, cmd) VALUES ($1, $2, $3) """, uid, cwd, command) async def handle_echo(reader, writer): try: await send_msg(writer, 0, 'hello') while True: op, message = await read_msg(reader) addr = writer.get_extra_info('peername') log.debug('received %r from %s', message, addr) await process(op, message) writer.close() except Exception as e: log.exception('error at handler coro') await send_msg(writer, -1, repr(e)) writer.close() if __name__ == '__main__': loop = asyncio.get_event_loop() coro = asyncio.start_unix_server(handle_echo, './log.suck', loop=loop) db = loop.run_until_complete(asyncpg.create_pool(**config.db)) server = loop.run_until_complete(coro) log.info(f'Serving on {server.sockets[0].getsockname()}') try: loop.run_forever() except KeyboardInterrupt: pass log.info('Closing server') server.close() loop.run_until_complete(server.wait_closed()) loop.close()