memed/bot/ext/rsudo.py

123 lines
3.6 KiB
Python

import logging
import asyncio
import discord
from discord.ext import commands
from .common import Cog
log = logging.getLogger(__name__)
async def shell(command: str):
process = await asyncio.create_subprocess_shell(
command,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
out, err = map(lambda s: s.decode('utf-8'), await process.communicate())
return f'{out}{err}'.strip()
class Rsudo(Cog):
def __init__(self, bot):
super().__init__(bot)
async def create_request(self, message):
command_channel = self.bot.get_channel(
self.bot.config.command_channel
)
if command_channel is None:
log.warning('command channel not found')
return
# parse it, follows format command,uid
uid = message.split(',')[-1]
command = ','.join(message.split(',')[:-1])
log.info(f'[rsudo] {uid!r} {command!r}')
embed = discord.Embed(
title=f'RSudo request from UID {uid}',
description=f'```sh\n{command}```',
color=discord.Color.purple(),
)
msg = await command_channel.send(embed=embed)
await msg.add_reaction('\N{WHITE HEAVY CHECK MARK}')
await msg.add_reaction('\N{CROSS MARK}')
emoji = {
'\N{WHITE HEAVY CHECK MARK}': True,
'\N{CROSS MARK}': False,
}
def check(_reaction, _user):
if _user is None:
return False
if _reaction.message.id != msg.id:
return
if _reaction.emoji not in emoji.keys():
return False
return discord.utils.get(_user.roles, id=self.bot.config.admin_role) is not None
reaction, admin = await self.bot.wait_for('reaction_add', check=check)
try:
await msg.clear_reactions()
except:
log.error('can not remove reactions')
should_execute = emoji.get(reaction.emoji)
user = None
# todo: get user ID from ldap to send results later
if should_execute:
log.info(f'executing command for {uid}, accepted by {admin} {admin.id}')
async with command_channel.typing():
embed.colour = discord.Color.green()
embed.add_field(name='status', value=f'Accepted by {admin} {admin.id}')
await msg.edit(embed=embed)
# todo: does this not execute in the wrong directory?
out = await shell(command)
# output may be very long
paginator = commands.Paginator(prefix='```sh\n', suffix='```')
for line in out.split('\n'):
paginator.add_line(line.strip())
for page in paginator.pages:
await command_channel.send(page)
try:
await user.send(f'Your request for the command {command} was accepted:')
for page in paginator.pages:
await user.send(page)
except discord.HTTPException:
await command_channel.send(f'Failed to send results to {user}.')
else:
log.info(f'not executing command for {uid}, denied by {admin} {admin.id}')
embed.colour = discord.Color.red()
embed.add_field(name='status', value=f'Denied by {admin} {admin.id}')
await msg.edit(embed=embed)
try:
await user.send(f'Your request for the command {command} was denied.')
except discord.HTTPException:
await command_channel.send(f'Failed to send denial message to {user}.')
def setup(bot):
bot.add_cog(Rsudo(bot))