mirror of
https://github.com/polyjitter/searchbot-discord.git
synced 2024-08-14 22:46:55 +00:00
533 lines
18 KiB
Python
533 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Developer functions
|
|
# Provides functions only useable for developers
|
|
|
|
'''Developer Cog'''
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
import os
|
|
import aiohttp
|
|
import random
|
|
import datetime
|
|
import collections
|
|
from contextlib import redirect_stdout
|
|
import traceback
|
|
import time
|
|
import io
|
|
import inspect
|
|
import textwrap
|
|
import subprocess
|
|
|
|
|
|
class Developer(commands.Cog):
|
|
"""Provides various resources for developers."""
|
|
|
|
def __init__(self, bot):
|
|
|
|
# Main Stuff
|
|
self.bot = bot
|
|
self.request = bot.request
|
|
self.emoji = "\U0001F3D7"
|
|
|
|
# Repl/Eval Stuff
|
|
self.repl_sessions = {}
|
|
self.repl_embeds = {}
|
|
self._eval = {}
|
|
|
|
def _cleanup_code(self, content):
|
|
"""Automatically removes code blocks from the code."""
|
|
# remove ```py\n```
|
|
if content.startswith('```') and content.endswith('```'):
|
|
return '\n'.join(content.split('\n')[1:-1])
|
|
|
|
# remove `foo`
|
|
return content.strip('` \n')
|
|
|
|
def _get_syntax_error(self, err):
|
|
"""Returns SyntaxError formatted for repl reply."""
|
|
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(
|
|
err,
|
|
'^',
|
|
type(err).__name__)
|
|
|
|
async def _post_to_hastebin(self, string):
|
|
"""Posts a string to hastebin."""
|
|
url = "https://hastebin.com/documents"
|
|
data = string.encode('utf-8')
|
|
async with self.request.post(url=url, data=data) as haste_response:
|
|
haste_key = (await haste_response.json())['key']
|
|
haste_url = f"http://hastebin.com/{haste_key}"
|
|
# data = {'sprunge': ''}
|
|
# data['sprunge'] = string
|
|
# haste_url = await self.aioclient.post(url='http://sprunge.us',
|
|
# data=data)
|
|
return haste_url
|
|
|
|
@commands.group(name='shell',
|
|
aliases=['ipython', 'repl', 'longexec'],
|
|
invoke_without_command=True)
|
|
async def repl(self, ctx, *, name: str = None):
|
|
"""Head on impact with an interactive python shell."""
|
|
# TODO Minimize local variables
|
|
# TODO Minimize branches
|
|
|
|
session = ctx.message.channel.id
|
|
|
|
embed = discord.Embed(
|
|
description="_Enter code to execute or evaluate. "
|
|
"`exit()` or `quit` to exit._",
|
|
timestamp=datetime.datetime.now())
|
|
|
|
embed.set_footer(
|
|
text="Interactive Python Shell",
|
|
icon_url="https://upload.wikimedia.org/wikipedia/commons/thumb"
|
|
"/c/c3/Python-logo-notext.svg/1024px-Python-logo-notext.svg.png")
|
|
|
|
if name is not None:
|
|
embed.title = name.strip(" ")
|
|
|
|
history = collections.OrderedDict()
|
|
|
|
variables = {
|
|
'ctx': ctx,
|
|
'bot': self.bot,
|
|
'message': ctx.message,
|
|
'server': ctx.message.guild,
|
|
'channel': ctx.message.channel,
|
|
'author': ctx.message.author,
|
|
'discord': discord,
|
|
'_': None
|
|
}
|
|
|
|
if session in self.repl_sessions:
|
|
error_embed = discord.Embed(
|
|
color=15746887,
|
|
description="**Error**: "
|
|
"_Shell is already running in channel._")
|
|
await ctx.send(embed=error_embed)
|
|
return
|
|
|
|
shell = await ctx.send(embed=embed)
|
|
|
|
self.repl_sessions[session] = shell
|
|
self.repl_embeds[shell] = embed
|
|
|
|
while True:
|
|
response = await self.bot.wait_for(
|
|
'message',
|
|
check=lambda m: m.content.startswith(
|
|
'`') and m.author == ctx.author and m.channel == ctx.channel
|
|
)
|
|
cleaned = self._cleanup_code(response.content)
|
|
shell = self.repl_sessions[session]
|
|
|
|
# Regular Bot Method
|
|
try:
|
|
await ctx.message.channel.fetch_message(
|
|
self.repl_sessions[session].id)
|
|
except discord.NotFound:
|
|
new_shell = await ctx.send(embed=self.repl_embeds[shell])
|
|
self.repl_sessions[session] = new_shell
|
|
|
|
embed = self.repl_embeds[shell]
|
|
del self.repl_embeds[shell]
|
|
self.repl_embeds[new_shell] = embed
|
|
|
|
shell = self.repl_sessions[session]
|
|
|
|
try:
|
|
await response.delete()
|
|
except discord.Forbidden:
|
|
pass
|
|
|
|
if len(self.repl_embeds[shell].fields) >= 7:
|
|
self.repl_embeds[shell].remove_field(0)
|
|
|
|
if cleaned in ('quit', 'exit', 'exit()'):
|
|
self.repl_embeds[shell].color = 16426522
|
|
|
|
if self.repl_embeds[shell].title is not discord.Embed.Empty:
|
|
history_string = "History for {}\n\n\n".format(
|
|
self.repl_embeds[shell].title)
|
|
else:
|
|
history_string = "History for latest session\n\n\n"
|
|
|
|
for item in history.keys():
|
|
history_string += ">>> {}\n{}\n\n".format(
|
|
item,
|
|
history[item])
|
|
|
|
haste_url = await self._post_to_hastebin(history_string)
|
|
return_msg = "[`Leaving shell session. "\
|
|
"History hosted on hastebin.`]({})".format(
|
|
haste_url)
|
|
|
|
self.repl_embeds[shell].add_field(
|
|
name="`>>> {}`".format(cleaned),
|
|
value=return_msg,
|
|
inline=False)
|
|
|
|
await self.repl_sessions[session].edit(
|
|
embed=self.repl_embeds[shell])
|
|
|
|
del self.repl_embeds[shell]
|
|
del self.repl_sessions[session]
|
|
return
|
|
|
|
executor = exec
|
|
if cleaned.count('\n') == 0:
|
|
# single statement, potentially 'eval'
|
|
try:
|
|
code = compile(cleaned, '<repl session>', 'eval')
|
|
except SyntaxError:
|
|
pass
|
|
else:
|
|
executor = eval
|
|
|
|
if executor is exec:
|
|
try:
|
|
code = compile(cleaned, '<repl session>', 'exec')
|
|
except SyntaxError as err:
|
|
self.repl_embeds[shell].color = 15746887
|
|
|
|
return_msg = self._get_syntax_error(err)
|
|
|
|
history[cleaned] = return_msg
|
|
|
|
if len(cleaned) > 800:
|
|
cleaned = "<Too big to be printed>"
|
|
if len(return_msg) > 800:
|
|
haste_url = await self._post_to_hastebin(return_msg)
|
|
return_msg = "[`SyntaxError too big to be printed. "\
|
|
"Hosted on hastebin.`]({})".format(
|
|
haste_url)
|
|
|
|
self.repl_embeds[shell].add_field(
|
|
name="`>>> {}`".format(cleaned),
|
|
value=return_msg,
|
|
inline=False)
|
|
|
|
await self.repl_sessions[session].edit(
|
|
embed=self.repl_embeds[shell])
|
|
continue
|
|
|
|
variables['message'] = response
|
|
|
|
fmt = None
|
|
stdout = io.StringIO()
|
|
|
|
try:
|
|
with redirect_stdout(stdout):
|
|
result = executor(code, variables)
|
|
if inspect.isawaitable(result):
|
|
result = await result
|
|
except Exception as err:
|
|
self.repl_embeds[shell].color = 15746887
|
|
value = stdout.getvalue()
|
|
fmt = '```py\n{}{}\n```'.format(
|
|
value,
|
|
traceback.format_exc())
|
|
else:
|
|
self.repl_embeds[shell].color = 4437377
|
|
|
|
value = stdout.getvalue()
|
|
|
|
if result is not None:
|
|
fmt = '```py\n{}{}\n```'.format(
|
|
value,
|
|
result)
|
|
|
|
variables['_'] = result
|
|
elif value:
|
|
fmt = '```py\n{}\n```'.format(value)
|
|
|
|
history[cleaned] = fmt
|
|
|
|
if len(cleaned) > 800:
|
|
cleaned = "<Too big to be printed>"
|
|
|
|
try:
|
|
if fmt is not None:
|
|
if len(fmt) >= 800:
|
|
haste_url = await self._post_to_hastebin(fmt)
|
|
self.repl_embeds[shell].add_field(
|
|
name="`>>> {}`".format(cleaned),
|
|
value="[`Content too big to be printed. "
|
|
"Hosted on hastebin.`]({})".format(
|
|
haste_url),
|
|
inline=False)
|
|
|
|
await self.repl_sessions[session].edit(
|
|
embed=self.repl_embeds[shell])
|
|
else:
|
|
self.repl_embeds[shell].add_field(
|
|
name="`>>> {}`".format(cleaned),
|
|
value=fmt,
|
|
inline=False)
|
|
|
|
await self.repl_sessions[session].edit(
|
|
embed=self.repl_embeds[shell])
|
|
else:
|
|
self.repl_embeds[shell].add_field(
|
|
name="`>>> {}`".format(cleaned),
|
|
value="`Empty response, assumed successful.`",
|
|
inline=False)
|
|
|
|
await self.repl_sessions[session].edit(
|
|
embed=self.repl_embeds[shell])
|
|
|
|
except discord.Forbidden:
|
|
pass
|
|
|
|
except discord.HTTPException as err:
|
|
error_embed = discord.Embed(
|
|
color=15746887,
|
|
description='**Error**: _{}_'.format(err))
|
|
await ctx.send(embed=error_embed)
|
|
|
|
@repl.command(name='jump',
|
|
aliases=['hop', 'pull', 'recenter', 'whereditgo'])
|
|
async def repljump(self, ctx):
|
|
"""Brings the shell back down so you can see it again."""
|
|
|
|
session = ctx.message.channel.id
|
|
|
|
if session not in self.repl_sessions:
|
|
error_embed = discord.Embed(
|
|
color=15746887,
|
|
description="**Error**: _No shell running in channel._")
|
|
await ctx.send(embed=error_embed)
|
|
return
|
|
|
|
shell = self.repl_sessions[session]
|
|
embed = self.repl_embeds[shell]
|
|
|
|
try:
|
|
await ctx.message.delete()
|
|
except discord.Forbidden:
|
|
pass
|
|
|
|
try:
|
|
await shell.delete()
|
|
except discord.errors.NotFound:
|
|
pass
|
|
new_shell = await ctx.send(embed=embed)
|
|
|
|
self.repl_sessions[session] = new_shell
|
|
|
|
del self.repl_embeds[shell]
|
|
self.repl_embeds[new_shell] = embed
|
|
|
|
@repl.command(name='clear',
|
|
aliases=['clean', 'purge', 'cleanup',
|
|
'ohfuckme', 'deletthis'])
|
|
async def replclear(self, ctx):
|
|
"""Clears the fields of the shell and resets the color."""
|
|
|
|
session = ctx.message.channel.id
|
|
|
|
if session not in self.repl_sessions:
|
|
error_embed = discord.Embed(
|
|
color=15746887,
|
|
description="**Error**: _No shell running in channel._")
|
|
await ctx.send(embed=error_embed)
|
|
return
|
|
|
|
shell = self.repl_sessions[session]
|
|
|
|
self.repl_embeds[shell].color = discord.Color.default()
|
|
self.repl_embeds[shell].clear_fields()
|
|
|
|
try:
|
|
await ctx.message.delete()
|
|
except discord.Forbidden:
|
|
pass
|
|
await shell.edit(embed=self.repl_embeds[shell])
|
|
|
|
@commands.command(name='eval')
|
|
async def eval_cmd(self, ctx, *, code: str):
|
|
"""Evaluates Python code."""
|
|
|
|
if self._eval.get('env') is None:
|
|
self._eval['env'] = {}
|
|
if self._eval.get('count') is None:
|
|
self._eval['count'] = 0
|
|
|
|
codebyspace = code.split(" ")
|
|
print(codebyspace)
|
|
silent = False
|
|
if codebyspace[0] == "--silent" or codebyspace[0] == "-s":
|
|
silent = True
|
|
codebyspace = codebyspace[1:]
|
|
code = " ".join(codebyspace)
|
|
|
|
self._eval['env'].update({
|
|
'self': self.bot,
|
|
'ctx': ctx,
|
|
'message': ctx.message,
|
|
'channel': ctx.message.channel,
|
|
'guild': ctx.message.guild,
|
|
'author': ctx.message.author,
|
|
})
|
|
|
|
# let's make this safe to work with
|
|
code = code.replace('```py\n', '').replace('```', '').replace('`', '')
|
|
|
|
_code = (
|
|
'async def func(self):\n try:\n{}\n '
|
|
'finally:\n self._eval[\'env\'].update(locals())').format(
|
|
textwrap.indent(code, ' '))
|
|
|
|
before = time.monotonic()
|
|
# noinspection PyBroadException
|
|
try:
|
|
exec(_code, self._eval['env'])
|
|
func = self._eval['env']['func']
|
|
output = await func(self)
|
|
|
|
if output is not None:
|
|
output = repr(output)
|
|
except Exception as e:
|
|
output = '{}: {}'.format(type(e).__name__, e)
|
|
after = time.monotonic()
|
|
self._eval['count'] += 1
|
|
count = self._eval['count']
|
|
|
|
code = code.split('\n')
|
|
if len(code) == 1:
|
|
_in = 'In [{}]: {}'.format(count, code[0])
|
|
else:
|
|
_first_line = code[0]
|
|
_rest = code[1:]
|
|
_rest = '\n'.join(_rest)
|
|
_countlen = len(str(count)) + 2
|
|
_rest = textwrap.indent(_rest, '...: ')
|
|
_rest = textwrap.indent(_rest, ' ' * _countlen)
|
|
_in = 'In [{}]: {}\n{}'.format(count, _first_line, _rest)
|
|
|
|
message = '```py\n{}'.format(_in)
|
|
if output is not None:
|
|
message += '\nOut[{}]: {}'.format(count, output)
|
|
ms = int(round((after - before) * 1000))
|
|
if ms > 100: # noticeable delay
|
|
message += '\n# {} ms\n```'.format(ms)
|
|
else:
|
|
message += '\n```'
|
|
|
|
try:
|
|
if ctx.author.id == self.bot.user.id:
|
|
await ctx.message.edit(content=message)
|
|
else:
|
|
if not silent:
|
|
await ctx.send(message)
|
|
except discord.HTTPException:
|
|
if not silent:
|
|
with aiohttp.ClientSession() as sesh:
|
|
async with sesh.post(
|
|
"https://hastebin.com/documents/",
|
|
data=output,
|
|
headers={"Content-Type": "text/plain"}) as r:
|
|
r = await r.json()
|
|
embed = discord.Embed(
|
|
description=(
|
|
"[View output - click]"
|
|
"(https://hastebin.com/raw/{})").format(
|
|
r["key"]))
|
|
await ctx.send(embed=embed)
|
|
|
|
@commands.command(aliases=['sys', 'sh'])
|
|
async def system(self, ctx, *, command: str):
|
|
"""Runs system commands."""
|
|
|
|
message = await ctx.send('<a:loading:393852367751086090> Processing...')
|
|
result = []
|
|
try:
|
|
process = subprocess.Popen(command.split(
|
|
' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
result = process.communicate()
|
|
except FileNotFoundError:
|
|
stderr = f'Command not found: {command}'
|
|
embed = discord.Embed(
|
|
title="Command output"
|
|
)
|
|
if len(result) >= 1 and result[0] in [None, b'']:
|
|
stdout = 'No output.'
|
|
if len(result) >= 2 and result[0] in [None, b'']:
|
|
stderr = 'No output.'
|
|
if len(result) >= 1 and result[0] not in [None, b'']:
|
|
stdout = result[0].decode('utf-8')
|
|
if len(result) >= 2 and result[1] not in [None, b'']:
|
|
stderr = result[1].decode('utf-8')
|
|
string = ""
|
|
if len(result) >= 1:
|
|
if (len(result[0]) >= 1024):
|
|
stdout = result[0].decode('utf-8')
|
|
string = string + f'[[STDOUT]]\n{stdout}'
|
|
link = await self._post_to_hastebin(string)
|
|
await message.edit(
|
|
content=f":x: Content too long. {link}",
|
|
embed=None)
|
|
return
|
|
if len(result) >= 2:
|
|
if (len(result[1]) >= 1024):
|
|
stdout = result[0].decode('utf-8')
|
|
string = string + f'[[STDERR]]\n{stdout}'
|
|
link = await self._post_to_hastebin(string)
|
|
await message.edit(
|
|
content=f":x: Content too long. {link}",
|
|
embed=None)
|
|
return
|
|
embed.add_field(
|
|
name="stdout",
|
|
value=f'```{stdout}```' if 'stdout' in locals() else 'No output.',
|
|
inline=False)
|
|
embed.add_field(
|
|
name="stderr",
|
|
value=f'```{stderr}```' if 'stderr' in locals() else 'No output.',
|
|
inline=False)
|
|
await message.edit(content='', embed=embed)
|
|
|
|
# @commands.command()
|
|
# async def git(self, ctx, sub, flags=""):
|
|
# """Runs some git commands in Discord."""
|
|
|
|
# if sub == "gud":
|
|
# if not flags:
|
|
# return await ctx.send("```You are now so gud!```")
|
|
# else:
|
|
# return await ctx.send(
|
|
# "```{} is now so gud!```".format(flags))
|
|
# elif sub == "rekt":
|
|
# if not flags:
|
|
# return await ctx.send("```You got #rekt!```")
|
|
# else:
|
|
# return await ctx.send(
|
|
# "```{} got #rekt!```".format(flags))
|
|
# else:
|
|
# process_msg = await ctx.send(
|
|
# "<a:typing:401162479041773568> Processing...")
|
|
# process = subprocess.Popen(
|
|
# f"git {sub + flags}",
|
|
# stdout=subprocess.PIPE,
|
|
# stderr=subprocess.PIPE)
|
|
# res = process.communicate()
|
|
# if res[0] == b'':
|
|
# content = "Successful!"
|
|
# else:
|
|
# content = res[0].decode("utf8")
|
|
# return await process_msg.edit(content=f"```{content}```")
|
|
|
|
@commands.command()
|
|
async def error(self, ctx):
|
|
"""Makes the bot error out."""
|
|
|
|
3 / 0
|
|
|
|
async def cog_check(self, ctx):
|
|
return (ctx.author.id == self.bot.owner_id)
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(Developer(bot))
|