# -*- coding: utf-8 -*- # Developer functions # Provides functions only useable for developers '''Developer Cog''' import discord from discord.ext import commands import os import aiohttp import random import datetime import collections from contextlib import redirect_stdout import traceback import time import io import inspect import textwrap import subprocess class Developer(commands.Cog): """Provides various resources for developers.""" def __init__(self, bot): # Main Stuff self.bot = bot self.request = bot.request self.emoji = "\U0001F3D7" # Repl/Eval Stuff self.repl_sessions = {} self.repl_embeds = {} self._eval = {} def _cleanup_code(self, content): """Automatically removes code blocks from the code.""" # remove ```py\n``` if content.startswith('```') and content.endswith('```'): return '\n'.join(content.split('\n')[1:-1]) # remove `foo` return content.strip('` \n') def _get_syntax_error(self, err): """Returns SyntaxError formatted for repl reply.""" return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format( err, '^', type(err).__name__) async def _post_to_hastebin(self, string): """Posts a string to hastebin.""" url = "https://hastebin.com/documents" data = string.encode('utf-8') async with self.request.post(url=url, data=data) as haste_response: haste_key = (await haste_response.json())['key'] haste_url = f"http://hastebin.com/{haste_key}" # data = {'sprunge': ''} # data['sprunge'] = string # haste_url = await self.aioclient.post(url='http://sprunge.us', # data=data) return haste_url @commands.group(name='shell', aliases=['ipython', 'repl', 'longexec'], invoke_without_command=True) async def repl(self, ctx, *, name: str = None): """Head on impact with an interactive python shell.""" # TODO Minimize local variables # TODO Minimize branches session = ctx.message.channel.id embed = discord.Embed( description="_Enter code to execute or evaluate. " "`exit()` or `quit` to exit._", timestamp=datetime.datetime.now()) embed.set_footer( text="Interactive Python Shell", icon_url="https://upload.wikimedia.org/wikipedia/commons/thumb" "/c/c3/Python-logo-notext.svg/1024px-Python-logo-notext.svg.png") if name is not None: embed.title = name.strip(" ") history = collections.OrderedDict() variables = { 'ctx': ctx, 'bot': self.bot, 'message': ctx.message, 'server': ctx.message.guild, 'channel': ctx.message.channel, 'author': ctx.message.author, 'discord': discord, '_': None } if session in self.repl_sessions: error_embed = discord.Embed( color=15746887, description="**Error**: " "_Shell is already running in channel._") await ctx.send(embed=error_embed) return shell = await ctx.send(embed=embed) self.repl_sessions[session] = shell self.repl_embeds[shell] = embed while True: response = await self.bot.wait_for( 'message', check=lambda m: m.content.startswith( '`') and m.author == ctx.author and m.channel == ctx.channel ) cleaned = self._cleanup_code(response.content) shell = self.repl_sessions[session] # Regular Bot Method try: await ctx.message.channel.fetch_message( self.repl_sessions[session].id) except discord.NotFound: new_shell = await ctx.send(embed=self.repl_embeds[shell]) self.repl_sessions[session] = new_shell embed = self.repl_embeds[shell] del self.repl_embeds[shell] self.repl_embeds[new_shell] = embed shell = self.repl_sessions[session] try: await response.delete() except discord.Forbidden: pass if len(self.repl_embeds[shell].fields) >= 7: self.repl_embeds[shell].remove_field(0) if cleaned in ('quit', 'exit', 'exit()'): self.repl_embeds[shell].color = 16426522 if self.repl_embeds[shell].title is not discord.Embed.Empty: history_string = "History for {}\n\n\n".format( self.repl_embeds[shell].title) else: history_string = "History for latest session\n\n\n" for item in history.keys(): history_string += ">>> {}\n{}\n\n".format( item, history[item]) haste_url = await self._post_to_hastebin(history_string) return_msg = "[`Leaving shell session. "\ "History hosted on hastebin.`]({})".format( haste_url) self.repl_embeds[shell].add_field( name="`>>> {}`".format(cleaned), value=return_msg, inline=False) await self.repl_sessions[session].edit( embed=self.repl_embeds[shell]) del self.repl_embeds[shell] del self.repl_sessions[session] return executor = exec if cleaned.count('\n') == 0: # single statement, potentially 'eval' try: code = compile(cleaned, '', 'eval') except SyntaxError: pass else: executor = eval if executor is exec: try: code = compile(cleaned, '', 'exec') except SyntaxError as err: self.repl_embeds[shell].color = 15746887 return_msg = self._get_syntax_error(err) history[cleaned] = return_msg if len(cleaned) > 800: cleaned = "" if len(return_msg) > 800: haste_url = await self._post_to_hastebin(return_msg) return_msg = "[`SyntaxError too big to be printed. "\ "Hosted on hastebin.`]({})".format( haste_url) self.repl_embeds[shell].add_field( name="`>>> {}`".format(cleaned), value=return_msg, inline=False) await self.repl_sessions[session].edit( embed=self.repl_embeds[shell]) continue variables['message'] = response fmt = None stdout = io.StringIO() try: with redirect_stdout(stdout): result = executor(code, variables) if inspect.isawaitable(result): result = await result except Exception as err: self.repl_embeds[shell].color = 15746887 value = stdout.getvalue() fmt = '```py\n{}{}\n```'.format( value, traceback.format_exc()) else: self.repl_embeds[shell].color = 4437377 value = stdout.getvalue() if result is not None: fmt = '```py\n{}{}\n```'.format( value, result) variables['_'] = result elif value: fmt = '```py\n{}\n```'.format(value) history[cleaned] = fmt if len(cleaned) > 800: cleaned = "" try: if fmt is not None: if len(fmt) >= 800: haste_url = await self._post_to_hastebin(fmt) self.repl_embeds[shell].add_field( name="`>>> {}`".format(cleaned), value="[`Content too big to be printed. " "Hosted on hastebin.`]({})".format( haste_url), inline=False) await self.repl_sessions[session].edit( embed=self.repl_embeds[shell]) else: self.repl_embeds[shell].add_field( name="`>>> {}`".format(cleaned), value=fmt, inline=False) await self.repl_sessions[session].edit( embed=self.repl_embeds[shell]) else: self.repl_embeds[shell].add_field( name="`>>> {}`".format(cleaned), value="`Empty response, assumed successful.`", inline=False) await self.repl_sessions[session].edit( embed=self.repl_embeds[shell]) except discord.Forbidden: pass except discord.HTTPException as err: error_embed = discord.Embed( color=15746887, description='**Error**: _{}_'.format(err)) await ctx.send(embed=error_embed) @repl.command(name='jump', aliases=['hop', 'pull', 'recenter', 'whereditgo']) async def repljump(self, ctx): """Brings the shell back down so you can see it again.""" session = ctx.message.channel.id if session not in self.repl_sessions: error_embed = discord.Embed( color=15746887, description="**Error**: _No shell running in channel._") await ctx.send(embed=error_embed) return shell = self.repl_sessions[session] embed = self.repl_embeds[shell] try: await ctx.message.delete() except discord.Forbidden: pass try: await shell.delete() except discord.errors.NotFound: pass new_shell = await ctx.send(embed=embed) self.repl_sessions[session] = new_shell del self.repl_embeds[shell] self.repl_embeds[new_shell] = embed @repl.command(name='clear', aliases=['clean', 'purge', 'cleanup', 'ohfuckme', 'deletthis']) async def replclear(self, ctx): """Clears the fields of the shell and resets the color.""" session = ctx.message.channel.id if session not in self.repl_sessions: error_embed = discord.Embed( color=15746887, description="**Error**: _No shell running in channel._") await ctx.send(embed=error_embed) return shell = self.repl_sessions[session] self.repl_embeds[shell].color = discord.Color.default() self.repl_embeds[shell].clear_fields() try: await ctx.message.delete() except discord.Forbidden: pass await shell.edit(embed=self.repl_embeds[shell]) @commands.command(name='eval') async def eval_cmd(self, ctx, *, code: str): """Evaluates Python code.""" if self._eval.get('env') is None: self._eval['env'] = {} if self._eval.get('count') is None: self._eval['count'] = 0 codebyspace = code.split(" ") print(codebyspace) silent = False if codebyspace[0] == "--silent" or codebyspace[0] == "-s": silent = True codebyspace = codebyspace[1:] code = " ".join(codebyspace) self._eval['env'].update({ 'self': self.bot, 'ctx': ctx, 'message': ctx.message, 'channel': ctx.message.channel, 'guild': ctx.message.guild, 'author': ctx.message.author, }) # let's make this safe to work with code = code.replace('```py\n', '').replace('```', '').replace('`', '') _code = ( 'async def func(self):\n try:\n{}\n ' 'finally:\n self._eval[\'env\'].update(locals())').format( textwrap.indent(code, ' ')) before = time.monotonic() # noinspection PyBroadException try: exec(_code, self._eval['env']) func = self._eval['env']['func'] output = await func(self) if output is not None: output = repr(output) except Exception as e: output = '{}: {}'.format(type(e).__name__, e) after = time.monotonic() self._eval['count'] += 1 count = self._eval['count'] code = code.split('\n') if len(code) == 1: _in = 'In [{}]: {}'.format(count, code[0]) else: _first_line = code[0] _rest = code[1:] _rest = '\n'.join(_rest) _countlen = len(str(count)) + 2 _rest = textwrap.indent(_rest, '...: ') _rest = textwrap.indent(_rest, ' ' * _countlen) _in = 'In [{}]: {}\n{}'.format(count, _first_line, _rest) message = '```py\n{}'.format(_in) if output is not None: message += '\nOut[{}]: {}'.format(count, output) ms = int(round((after - before) * 1000)) if ms > 100: # noticeable delay message += '\n# {} ms\n```'.format(ms) else: message += '\n```' try: if ctx.author.id == self.bot.user.id: await ctx.message.edit(content=message) else: if not silent: await ctx.send(message) except discord.HTTPException: if not silent: with aiohttp.ClientSession() as sesh: async with sesh.post( "https://hastebin.com/documents/", data=output, headers={"Content-Type": "text/plain"}) as r: r = await r.json() embed = discord.Embed( description=( "[View output - click]" "(https://hastebin.com/raw/{})").format( r["key"])) await ctx.send(embed=embed) @commands.command(aliases=['sys', 'sh']) async def system(self, ctx, *, command: str): """Runs system commands.""" message = await ctx.send(' Processing...') result = [] try: process = subprocess.Popen(command.split( ' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = process.communicate() except FileNotFoundError: stderr = f'Command not found: {command}' embed = discord.Embed( title="Command output" ) if len(result) >= 1 and result[0] in [None, b'']: stdout = 'No output.' if len(result) >= 2 and result[0] in [None, b'']: stderr = 'No output.' if len(result) >= 1 and result[0] not in [None, b'']: stdout = result[0].decode('utf-8') if len(result) >= 2 and result[1] not in [None, b'']: stderr = result[1].decode('utf-8') string = "" if len(result) >= 1: if (len(result[0]) >= 1024): stdout = result[0].decode('utf-8') string = string + f'[[STDOUT]]\n{stdout}' link = await self._post_to_hastebin(string) await message.edit( content=f":x: Content too long. {link}", embed=None) return if len(result) >= 2: if (len(result[1]) >= 1024): stdout = result[0].decode('utf-8') string = string + f'[[STDERR]]\n{stdout}' link = await self._post_to_hastebin(string) await message.edit( content=f":x: Content too long. {link}", embed=None) return embed.add_field( name="stdout", value=f'```{stdout}```' if 'stdout' in locals() else 'No output.', inline=False) embed.add_field( name="stderr", value=f'```{stderr}```' if 'stderr' in locals() else 'No output.', inline=False) await message.edit(content='', embed=embed) # @commands.command() # async def git(self, ctx, sub, flags=""): # """Runs some git commands in Discord.""" # if sub == "gud": # if not flags: # return await ctx.send("```You are now so gud!```") # else: # return await ctx.send( # "```{} is now so gud!```".format(flags)) # elif sub == "rekt": # if not flags: # return await ctx.send("```You got #rekt!```") # else: # return await ctx.send( # "```{} got #rekt!```".format(flags)) # else: # process_msg = await ctx.send( # " Processing...") # process = subprocess.Popen( # f"git {sub + flags}", # stdout=subprocess.PIPE, # stderr=subprocess.PIPE) # res = process.communicate() # if res[0] == b'': # content = "Successful!" # else: # content = res[0].decode("utf8") # return await process_msg.edit(content=f"```{content}```") @commands.command() async def error(self, ctx): """Makes the bot error out.""" 3 / 0 async def cog_check(self, ctx): return (ctx.author.id == self.bot.owner_id) def setup(bot): bot.add_cog(Developer(bot))