193 lines
5.3 KiB
Python
193 lines
5.3 KiB
Python
import logging
|
|
import asyncio
|
|
from typing import Dict
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
from .common import Cog
|
|
|
|
log = logging.getLogger(__name__)
|
|
UserID = int
|
|
|
|
async def shell(command: str):
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
out, err = map(lambda s: s.decode('utf-8'), await process.communicate())
|
|
return f'{out}{err}'.strip()
|
|
|
|
|
|
async def distance(s, len_s, t, len_t):
|
|
cost = 0
|
|
|
|
if not len_s:
|
|
return len_t
|
|
|
|
if not len_t:
|
|
return len_s
|
|
|
|
if s[len_s - 1] == t[len_t - 1]:
|
|
cost = 0
|
|
else:
|
|
cost = 1
|
|
|
|
l = [
|
|
(await distance(s, len_s - 1, t, len_t)) + 1,
|
|
(await distance(s, len_s, t, len_t - 1)) + 1,
|
|
(await distance(s, len_s - 1, t, len_t - 1)) + cost
|
|
]
|
|
|
|
return min(l)
|
|
|
|
|
|
class Rsudo(Cog):
|
|
def __init__(self, bot):
|
|
super().__init__(bot)
|
|
|
|
async def lookupuser(self, uname: str) -> Dict[UserID, float]:
|
|
lu = len(uname)
|
|
|
|
cchan = self.bot.get_channel(
|
|
self.bot.config.command_channel
|
|
)
|
|
|
|
if not cchan:
|
|
return None
|
|
|
|
members = cchan.guild.members
|
|
scores = {}
|
|
for member in members:
|
|
if member.bot: continue
|
|
|
|
lmn = len(member.name)
|
|
d = await distance(member.name, lmn, uname, lu)
|
|
scores[member.id] = d
|
|
|
|
if uname in member.name or uname in member.nick:
|
|
scores[member.id] = -1
|
|
|
|
users = sorted(scores.keys(), key=lambda k: scores[k])
|
|
if not users:
|
|
return None
|
|
|
|
return users[0]
|
|
|
|
async def request(self, message, wait=False):
|
|
command_channel = self.bot.get_channel(
|
|
self.bot.config.command_channel
|
|
)
|
|
|
|
if command_channel is None:
|
|
log.warning('command channel not found')
|
|
return
|
|
|
|
# parse it, follows format command,uid,username
|
|
sp = message.split(',')
|
|
username = sp[-1]
|
|
uid = sp[-2]
|
|
|
|
command = ','.join(sp[:-2])
|
|
|
|
log.info(f'[rsudo] {uid!r} {command!r}')
|
|
|
|
embed = discord.Embed(
|
|
title=f'RSudo request from UID {uid} UNAME {username!r}',
|
|
description=f'```sh\n{command}```',
|
|
color=discord.Color.purple(),
|
|
)
|
|
msg = await command_channel.send(embed=embed)
|
|
|
|
await msg.add_reaction('\N{WHITE HEAVY CHECK MARK}')
|
|
await msg.add_reaction('\N{CROSS MARK}')
|
|
|
|
emoji = {
|
|
'\N{WHITE HEAVY CHECK MARK}': True,
|
|
'\N{CROSS MARK}': False,
|
|
}
|
|
|
|
def check(_reaction, _user):
|
|
if _user is None:
|
|
return False
|
|
|
|
if _reaction.message.id != msg.id:
|
|
return
|
|
|
|
if _reaction.emoji not in emoji.keys():
|
|
return False
|
|
|
|
return discord.utils.get(_user.roles, id=self.bot.config.admin_role) is not None
|
|
|
|
reaction, admin = await self.bot.wait_for('reaction_add', check=check)
|
|
try:
|
|
await msg.clear_reactions()
|
|
except:
|
|
log.error('can not remove reactions')
|
|
|
|
should_execute = emoji.get(reaction.emoji)
|
|
|
|
user = None
|
|
# todo: get user ID from ldap to send results later
|
|
|
|
if should_execute:
|
|
log.info(f'executing command for {uid}, accepted by {admin} {admin.id}')
|
|
|
|
async with command_channel.typing():
|
|
embed.colour = discord.Color.green()
|
|
embed.add_field(name='status', value=f'Accepted by {admin} {admin.id}')
|
|
|
|
await msg.edit(embed=embed)
|
|
|
|
# todo: does this not execute in the wrong directory?
|
|
if wait:
|
|
return True
|
|
else:
|
|
out = await shell(command)
|
|
|
|
# output may be very long
|
|
paginator = commands.Paginator(prefix='```sh\n', suffix='```')
|
|
for line in out.split('\n'):
|
|
paginator.add_line(line.strip())
|
|
|
|
for page in paginator.pages:
|
|
await command_channel.send(page)
|
|
|
|
try:
|
|
# await user.send(f'Your request for the command {command} was accepted:')
|
|
pass
|
|
|
|
for page in paginator.pages:
|
|
await user.send(page)
|
|
except discord.HTTPException:
|
|
await command_channel.send(f'Failed to send results to {user}.')
|
|
else:
|
|
log.info(f'not executing command for {uid}, denied by {admin} {admin.id}')
|
|
|
|
embed.colour = discord.Color.red()
|
|
embed.add_field(name='status', value=f'Denied by {admin} {admin.id}')
|
|
|
|
await msg.edit(embed=embed)
|
|
|
|
if wait:
|
|
return False
|
|
|
|
try:
|
|
# await user.send(f'Your request for the command {command} was denied.')
|
|
pass
|
|
except discord.HTTPException:
|
|
await command_channel.send(f'Failed to send denial message to {user}.')
|
|
|
|
@commands.command()
|
|
async def lookup(self, ctx, uname: str):
|
|
u = await self.lookupuser(uname)
|
|
if not u:
|
|
return await ctx.send('none found')
|
|
|
|
u = self.bot.get_user(u)
|
|
await ctx.send(repr(u))
|
|
|
|
def setup(bot):
|
|
bot.add_cog(Rsudo(bot))
|