craiyon-irc/main.py

311 lines
9.7 KiB
Python

import re
import time
import os
import random
import json
from base64 import b64decode
from typing import Optional, List
from datetime import datetime
import logging
import asyncio
from io import BytesIO
import bottom
from PIL import Image, ImageDraw, ImageFont
from PIL.JpegImagePlugin import JpegImageFile
import aiohttp
from settings import *
IGNORE_NICKS = ["monerobux", "denise", "quotez"]
TASK_QUEUE = asyncio.Queue()
TASK_AUTHORS = {}
FONT = ImageFont.truetype('font-bold.ttf', 22)
bot = bottom.Client(host=IRC_HOST, port=IRC_PORT, ssl=IRC_TLS, loop=asyncio.get_event_loop())
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36'
HEADERS = {
'User-Agent': USER_AGENT,
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.craiyon.com/',
'Origin': 'https://www.craiyon.com',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
class Task:
def __init__(self, author: str, channel: str, term: str, dt: int):
self.author = author
self.channel = channel
self.term = term
self.dt = dt
@property
def term_short(self):
max_len = 60
if len(self.term) <= max_len:
return self.term
return self.term[:max_len] + ".."
async def process_images(self, images: List[JpegImageFile]) -> JpegImageFile:
if len(images) <= 0:
raise Exception("Not enough images")
if MODE == "monero":
bg = Image.open('bg-pools.jpg')
term_max = 60
term_x_start = 100
elif MODE == "2600nl":
bg = Image.open('bg-2600nl.jpg')
term_max = 52
term_x_start = 230
else:
bg = Image.open('bg-wownero.jpg')
term_max = 52
term_x_start = 230
d1 = ImageDraw.Draw(bg)
term = self.term
if len(term) >= term_max:
term = self.term[:term_max] + ".."
d1.text((term_x_start, 160), term, font=FONT, fill=(255, 255, 255))
x_start = 41
y_start = 232
for i, img in enumerate(images):
x_offset = (i % 3) * (img.width + 20)
y_offset = int(i / 3) * (img.width + 20)
bg.paste(img, (x_start + x_offset, y_start + y_offset))
return bg
async def post_image_xmr(self, image) -> str:
url = 'https://dumb.ai.xmr.pm/paste/img'
timeout = aiohttp.ClientTimeout(total=10)
data = aiohttp.FormData()
data.add_field('files[]', image, filename='lol.jpg', content_type='image/jpg')
try:
async with aiohttp.ClientSession(
timeout=timeout,
headers={"User-Agent": "w0w"}) as session:
async with session.post(url, data=data) as resp:
blob = await resp.json()
return f"https://dumb.ai.xmr.pm{blob['redirect']}"
except Exception as ex:
raise Exception("Could not upload file to xmr.pm :((")
async def post_image_lewd(self, image) -> str:
timeout = aiohttp.ClientTimeout(total=5)
data = aiohttp.FormData()
data.add_field('fileToUpload', image, filename='lol.jpg', content_type='image/jpg')
data.add_field('exifData', 'Yes')
data.add_field('submiter', 'Upload')
data.add_field('iUrl', '')
data.add_field('iUrlb', '')
try:
async with aiohttp.ClientSession(
timeout=timeout,
headers={"User-Agent": "w0w"}) as session:
async with session.post('https://lewd.pics/p/', data=data) as resp:
body = await resp.read()
re_rule = R"lewd\.pics\/p\/([a-zA-Z.]+)\""
blobs = re.findall(re_rule, body.decode())
uid = blobs[0]
return f"https://lewd.pics/p/{uid}"
except Exception as ex:
raise Exception("Could not upload file to lewd.pics :((")
async def post_image_uguu(self, image: BytesIO) -> str:
url = 'https://uguu.se/upload.php'
timeout = aiohttp.ClientTimeout(total=5)
data = aiohttp.FormData()
data.add_field('files[]', image, filename='lol.jpg', content_type='image/jpg')
try:
async with aiohttp.ClientSession(
timeout=timeout,
headers={"User-Agent": "w0w"}) as session:
async with session.post(url, data=data) as resp:
blob = await resp.json()
return blob['files'][0]['url']
except Exception as ex:
raise Exception("Could not upload file to uguu.se :((")
async def get_images(self) -> Optional[List[JpegImageFile]]:
url = 'https://backend.craiyon.com/generate'
timeout_secs = 120
timeout = aiohttp.ClientTimeout(total=timeout_secs)
images = []
try:
async with aiohttp.ClientSession(
timeout=timeout,
headers=HEADERS) as session:
async with session.post(url, json={'prompt': self.term}) as resp:
if resp.status != 200:
raise Exception("POST did not return status 200")
blob = await resp.json()
if 'images' not in blob:
raise Exception("result did not return images")
images = blob['images']
except asyncio.TimeoutError:
raise Exception(f"image generation timeout exceeded ({timeout_secs} secs)")
try:
images = [b64decode(i) for i in images]
images = [Image.open(BytesIO(i)) for i in images]
return [i.resize((256,256)) for i in images]
except Exception as ex:
raise Exception("could not decode results")
@bot.on('CLIENT_CONNECT')
async def connect(**kwargs):
bot.send('NICK', nick=IRC_NICK)
bot.send('USER', user=IRC_NICK, realname=IRC_NICK)
done, pending = await asyncio.wait(
[bot.wait("RPL_ENDOFMOTD"), bot.wait("ERR_NOMOTD")],
loop=bot.loop,
return_when=asyncio.FIRST_COMPLETED
)
for future in pending:
future.cancel()
for c in IRC_CHANNELS:
bot.send('JOIN', channel=c)
@bot.on('PING')
def keepalive(message, **kwargs):
bot.send('PONG', message=message)
@bot.on('client_disconnect')
def reconnect(**kwargs):
logging.warning("Lost IRC server connection")
time.sleep(3)
bot.loop.create_task(bot.connect())
logging.warning("Reconnecting to IRC server")
@bot.on('PRIVMSG')
async def message_received(nick, target, message, **kwargs):
if nick == IRC_NICK:
return
if nick in IGNORE_NICKS:
return
if target == IRC_NICK:
target = nick
if target not in IRC_CHANNELS:
return
msg = message
msg = msg.replace("!dall", " !dall")
msg = re.sub(r'!\W+', ' ', msg)
now = datetime.now()
now = time.mktime(now.timetuple())
await handle_msg(nick, msg, target, now)
async def handle_msg(nick, msg, target, now: float):
global TASK_AUTHORS, TASK_QUEUE
def _err(target, err):
return bot.send("PRIVMSG", target=target, message=err)
if "re @" in msg:
return
if "!dall " not in msg:
return
if not SCANLINE:
if not msg.strip().startswith("!dall "):
return
msg = msg[msg.find("!dall "):]
msg = msg[6:].strip()
if len(msg) <= 8:
return _err(target, f"longer query required.")
TASK_AUTHORS.setdefault(nick, 0)
if TASK_AUTHORS[nick] >= 4:
err = f"{nick}: you already queued 4 thingies, patient!!11"
bot.send("PRIVMSG", target=target, message=err)
return
TASK_AUTHORS[nick] += 1
task = Task(channel=target, author=nick, term=msg, dt=int(now))
await TASK_QUEUE.put(task)
bot.send("PRIVMSG", target=target, message=f"generating...")
async def main():
global TASK_AUTHORS, TASK_QUEUE
bot.loop.create_task(bot.connect())
def _lower_author_task_count(task_author):
TASK_AUTHORS.setdefault(task_author, 1)
TASK_AUTHORS[task_author] -= 1
def _err(_task, err):
return bot.send("PRIVMSG", target=_task.channel, message=f"{_task.author}: Error \"{_task.term_short}\"; {err}")
# main loop
while True:
now = datetime.now()
random.shuffle(TASK_QUEUE._queue)
task = await TASK_QUEUE.get()
try:
images = await task.get_images()
except Exception as ex:
_err(task, err=f"Fetch: {ex}")
_lower_author_task_count(task.author)
continue
try:
image = await task.process_images(images)
except Exception as ex:
_err(task, err=f"Process: {ex}")
_lower_author_task_count(task.author)
continue
image_bytes = BytesIO()
image.save(image_bytes, 'JPEG')
image.save(os.path.join('data', f"{task.dt}_{task.author}.jpg"), 'JPEG')
image_bytes.seek(0)
try:
url = await task.post_image_xmr(image_bytes)
except Exception as ex:
_err(task, err=f"image upload: {ex}")
_lower_author_task_count(task.author)
continue
_lower_author_task_count(task.author)
completed_secs = int((datetime.now() - now).total_seconds())
bot.send("NOTICE" if NOTICES else "PRIVMSG",
target=task.channel,
message=f"{url} \"{task.term_short}\" (total: {completed_secs}s, {task.author}) ")
await asyncio.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())