memed/bot/ext/rsudo.py

194 lines
5.3 KiB
Python
Raw Normal View History

2017-12-02 14:24:55 +00:00
import logging
2017-12-02 17:40:19 +00:00
import asyncio
2017-12-03 19:00:47 +00:00
from typing import Dict
2017-12-02 14:24:55 +00:00
import discord
2017-12-02 20:32:50 +00:00
from discord.ext import commands
2017-12-02 14:24:55 +00:00
from .common import Cog
log = logging.getLogger(__name__)
2017-12-03 19:00:47 +00:00
UserID = int
2017-12-02 14:24:55 +00:00
2017-12-02 17:40:19 +00:00
async def shell(command: str):
process = await asyncio.create_subprocess_shell(
command,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
out, err = map(lambda s: s.decode('utf-8'), await process.communicate())
return f'{out}{err}'.strip()
2017-12-03 19:00:47 +00:00
async def distance(s, len_s, t, len_t):
cost = 0
if not len_s:
return len_t
if not len_t:
return len_s
if s[len_s - 1] == t[len_t - 1]:
cost = 0
else:
cost = 1
l = [
(await distance(s, len_s - 1, t, len_t)) + 1,
(await distance(s, len_s, t, len_t - 1)) + 1,
(await distance(s, len_s - 1, t, len_t - 1)) + cost
]
return min(l)
2017-12-02 14:24:55 +00:00
class Rsudo(Cog):
def __init__(self, bot):
super().__init__(bot)
2017-12-03 19:00:47 +00:00
async def lookupuser(self, uname: str) -> Dict[UserID, float]:
lu = len(uname)
cchan = self.bot.get_channel(
self.bot.config.command_channel
)
if not cchan:
return None
members = cchan.guild.members
scores = {}
for member in members:
if member.bot: continue
lmn = len(member.name)
d = await distance(member.name, lmn, uname, lu)
scores[member.id] = d
if uname in member.name or uname in member.nick:
scores[member.id] = -1
users = sorted(scores.keys(), key=lambda k: scores[k])
if not users:
return None
return users[0]
2017-12-08 00:54:46 +00:00
async def request(self, message, wait=False):
2017-12-02 20:32:50 +00:00
command_channel = self.bot.get_channel(
self.bot.config.command_channel
)
2017-12-02 15:53:47 +00:00
2017-12-02 20:32:50 +00:00
if command_channel is None:
2017-12-02 15:53:47 +00:00
log.warning('command channel not found')
return
2017-12-03 06:26:30 +00:00
# parse it, follows format command,uid,username
sp = message.split(',')
username = sp[-1]
uid = sp[-2]
command = ','.join(sp[:-2])
2017-12-02 14:24:55 +00:00
log.info(f'[rsudo] {uid!r} {command!r}')
2017-12-02 15:53:47 +00:00
2017-12-02 20:32:50 +00:00
embed = discord.Embed(
2017-12-03 06:28:11 +00:00
title=f'RSudo request from UID {uid} UNAME {username!r}',
2017-12-02 20:32:50 +00:00
description=f'```sh\n{command}```',
color=discord.Color.purple(),
)
msg = await command_channel.send(embed=embed)
2017-12-02 14:24:55 +00:00
2017-12-02 20:32:50 +00:00
await msg.add_reaction('\N{WHITE HEAVY CHECK MARK}')
await msg.add_reaction('\N{CROSS MARK}')
2017-12-02 17:40:19 +00:00
2017-12-02 20:32:50 +00:00
emoji = {
'\N{WHITE HEAVY CHECK MARK}': True,
'\N{CROSS MARK}': False,
}
2017-12-02 17:40:19 +00:00
2017-12-02 20:32:50 +00:00
def check(_reaction, _user):
if _user is None:
return False
2017-12-02 15:54:14 +00:00
if _reaction.message.id != msg.id:
return
2017-12-02 20:37:59 +00:00
if _reaction.emoji not in emoji.keys():
2017-12-02 20:32:50 +00:00
return False
return discord.utils.get(_user.roles, id=self.bot.config.admin_role) is not None
reaction, admin = await self.bot.wait_for('reaction_add', check=check)
2017-12-03 04:55:40 +00:00
try:
await msg.clear_reactions()
except:
log.error('can not remove reactions')
2017-12-02 20:32:50 +00:00
should_execute = emoji.get(reaction.emoji)
user = None
# todo: get user ID from ldap to send results later
if should_execute:
log.info(f'executing command for {uid}, accepted by {admin} {admin.id}')
async with command_channel.typing():
embed.colour = discord.Color.green()
embed.add_field(name='status', value=f'Accepted by {admin} {admin.id}')
await msg.edit(embed=embed)
# todo: does this not execute in the wrong directory?
2017-12-08 00:54:46 +00:00
if wait:
return True
else:
out = await shell(command)
2017-12-02 20:32:50 +00:00
# output may be very long
paginator = commands.Paginator(prefix='```sh\n', suffix='```')
for line in out.split('\n'):
paginator.add_line(line.strip())
for page in paginator.pages:
await command_channel.send(page)
try:
2017-12-03 19:01:44 +00:00
# await user.send(f'Your request for the command {command} was accepted:')
pass
2017-12-02 20:32:50 +00:00
for page in paginator.pages:
await user.send(page)
except discord.HTTPException:
await command_channel.send(f'Failed to send results to {user}.')
2017-12-02 17:40:19 +00:00
else:
2017-12-02 20:32:50 +00:00
log.info(f'not executing command for {uid}, denied by {admin} {admin.id}')
embed.colour = discord.Color.red()
embed.add_field(name='status', value=f'Denied by {admin} {admin.id}')
await msg.edit(embed=embed)
2017-12-08 00:54:46 +00:00
if wait:
return False
2017-12-02 20:32:50 +00:00
try:
2017-12-03 19:01:44 +00:00
# await user.send(f'Your request for the command {command} was denied.')
pass
2017-12-02 20:32:50 +00:00
except discord.HTTPException:
await command_channel.send(f'Failed to send denial message to {user}.')
2017-12-02 15:54:14 +00:00
2017-12-03 19:00:47 +00:00
@commands.command()
async def lookup(self, ctx, uname: str):
u = await self.lookupuser(uname)
if not u:
return await ctx.send('none found')
u = self.bot.get_user(u)
await ctx.send(repr(u))
2017-12-02 14:24:55 +00:00
def setup(bot):
bot.add_cog(Rsudo(bot))