memed/bot/ext/rsudo.py
2017-12-03 16:00:47 -03:00

185 lines
5.1 KiB
Python

import logging
import asyncio
from typing import Dict
import discord
from discord.ext import commands
from .common import Cog
log = logging.getLogger(__name__)
UserID = int
async def shell(command: str):
process = await asyncio.create_subprocess_shell(
command,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
out, err = map(lambda s: s.decode('utf-8'), await process.communicate())
return f'{out}{err}'.strip()
async def distance(s, len_s, t, len_t):
cost = 0
if not len_s:
return len_t
if not len_t:
return len_s
if s[len_s - 1] == t[len_t - 1]:
cost = 0
else:
cost = 1
l = [
(await distance(s, len_s - 1, t, len_t)) + 1,
(await distance(s, len_s, t, len_t - 1)) + 1,
(await distance(s, len_s - 1, t, len_t - 1)) + cost
]
return min(l)
class Rsudo(Cog):
def __init__(self, bot):
super().__init__(bot)
async def lookupuser(self, uname: str) -> Dict[UserID, float]:
lu = len(uname)
cchan = self.bot.get_channel(
self.bot.config.command_channel
)
if not cchan:
return None
members = cchan.guild.members
scores = {}
for member in members:
if member.bot: continue
lmn = len(member.name)
d = await distance(member.name, lmn, uname, lu)
scores[member.id] = d
if uname in member.name or uname in member.nick:
scores[member.id] = -1
users = sorted(scores.keys(), key=lambda k: scores[k])
if not users:
return None
return users[0]
async def create_request(self, message):
command_channel = self.bot.get_channel(
self.bot.config.command_channel
)
if command_channel is None:
log.warning('command channel not found')
return
# parse it, follows format command,uid,username
sp = message.split(',')
username = sp[-1]
uid = sp[-2]
command = ','.join(sp[:-2])
log.info(f'[rsudo] {uid!r} {command!r}')
embed = discord.Embed(
title=f'RSudo request from UID {uid} UNAME {username!r}',
description=f'```sh\n{command}```',
color=discord.Color.purple(),
)
msg = await command_channel.send(embed=embed)
await msg.add_reaction('\N{WHITE HEAVY CHECK MARK}')
await msg.add_reaction('\N{CROSS MARK}')
emoji = {
'\N{WHITE HEAVY CHECK MARK}': True,
'\N{CROSS MARK}': False,
}
def check(_reaction, _user):
if _user is None:
return False
if _reaction.message.id != msg.id:
return
if _reaction.emoji not in emoji.keys():
return False
return discord.utils.get(_user.roles, id=self.bot.config.admin_role) is not None
reaction, admin = await self.bot.wait_for('reaction_add', check=check)
try:
await msg.clear_reactions()
except:
log.error('can not remove reactions')
should_execute = emoji.get(reaction.emoji)
user = None
# todo: get user ID from ldap to send results later
if should_execute:
log.info(f'executing command for {uid}, accepted by {admin} {admin.id}')
async with command_channel.typing():
embed.colour = discord.Color.green()
embed.add_field(name='status', value=f'Accepted by {admin} {admin.id}')
await msg.edit(embed=embed)
# todo: does this not execute in the wrong directory?
out = await shell(command)
# output may be very long
paginator = commands.Paginator(prefix='```sh\n', suffix='```')
for line in out.split('\n'):
paginator.add_line(line.strip())
for page in paginator.pages:
await command_channel.send(page)
try:
await user.send(f'Your request for the command {command} was accepted:')
for page in paginator.pages:
await user.send(page)
except discord.HTTPException:
await command_channel.send(f'Failed to send results to {user}.')
else:
log.info(f'not executing command for {uid}, denied by {admin} {admin.id}')
embed.colour = discord.Color.red()
embed.add_field(name='status', value=f'Denied by {admin} {admin.id}')
await msg.edit(embed=embed)
try:
await user.send(f'Your request for the command {command} was denied.')
except discord.HTTPException:
await command_channel.send(f'Failed to send denial message to {user}.')
@commands.command()
async def lookup(self, ctx, uname: str):
u = await self.lookupuser(uname)
if not u:
return await ctx.send('none found')
u = self.bot.get_user(u)
await ctx.send(repr(u))
def setup(bot):
bot.add_cog(Rsudo(bot))