add LibCal functionality
This commit is contained in:
parent
bae52e7d7e
commit
a5b5189e52
7 changed files with 341 additions and 71 deletions
104
ext/libcal.py
Normal file
104
ext/libcal.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import traceback
|
||||
from hikari.files import Resource
|
||||
|
||||
import miru
|
||||
import hikari
|
||||
import lightbulb
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from lib.config import load_config
|
||||
from lib.gcal import GoogleCalendarAPI
|
||||
from lib.room import Room
|
||||
from lib.ocr import get_room_data, NoMatchException
|
||||
|
||||
plugin = lightbulb.Plugin("LibCal")
|
||||
config = load_config()
|
||||
|
||||
def add_rooms_to_embed(room_list: list[Room], embed: hikari.Embed):
|
||||
for room in room_list:
|
||||
embed.add_field(room.get_time_str(), room.number)
|
||||
|
||||
class BookingConfirmationView(miru.View):
|
||||
def __init__(self, *, timeout: Optional[float] = 120, autodefer: bool = True, room_list: list[Room] = [], image: Resource | None = None) -> None:
|
||||
self.room_list = room_list
|
||||
if image is not None:
|
||||
self.image = image
|
||||
else:
|
||||
self.image = None
|
||||
super().__init__(timeout=timeout, autodefer=autodefer)
|
||||
|
||||
@miru.button(label="Yes", style=hikari.ButtonStyle.SUCCESS)
|
||||
async def confirm_button(self, button: miru.Button, ctx: miru.Context) -> None:
|
||||
gcal = GoogleCalendarAPI()
|
||||
for room in self.room_list:
|
||||
await gcal.create_event(room.number, room.start_time, room.end_time)
|
||||
embed = hikari.Embed(title="Rooms Booked!")
|
||||
add_rooms_to_embed(self.room_list, embed)
|
||||
await ctx.edit_response(embed=embed, components=[])
|
||||
|
||||
thread_embed = hikari.Embed(title="Rooms were booked!")
|
||||
thread_embed.set_author(name=ctx.user.username, icon=ctx.user.avatar_url)
|
||||
thread_embed.set_image(self.image)
|
||||
add_rooms_to_embed(self.room_list, thread_embed)
|
||||
await ctx.app.rest.create_message(config.discord_study_room_thread, embed=thread_embed)
|
||||
self.stop()
|
||||
|
||||
@miru.button(label="No", style=hikari.ButtonStyle.DANGER)
|
||||
async def cancel_button(self, button: miru.Button, ctx: miru.Context) -> None:
|
||||
embed = hikari.Embed(title="Aborted!")
|
||||
await ctx.edit_response(embed=embed, components=[])
|
||||
self.stop()
|
||||
|
||||
@plugin.command
|
||||
@lightbulb.option("img", "image to check for events", type=hikari.Attachment, required=True)
|
||||
@lightbulb.command("book", "add room bookings to Google Calendar", ephemeral=True)
|
||||
@lightbulb.implements(lightbulb.SlashCommand)
|
||||
async def book(ctx: lightbulb.Context) -> None:
|
||||
response = hikari.Embed()
|
||||
image = await ctx.options.img.read()
|
||||
room_list = get_room_data(image)
|
||||
add_rooms_to_embed(room_list, response)
|
||||
response.title = "Room Bookings"
|
||||
response.description = "Are the booking(s) correct?"
|
||||
view = BookingConfirmationView(room_list=room_list, image=ctx.options.img)
|
||||
resp = await ctx.respond(response, components=view.build())
|
||||
view.start(await resp.message())
|
||||
|
||||
|
||||
@book.set_error_handler
|
||||
async def book_error_handler(event: lightbulb.CommandErrorEvent):
|
||||
print("ERROR TYPE:", type(event.exception))
|
||||
if isinstance(event.exception.__cause__, NoMatchException):
|
||||
embed = hikari.Embed(title="Huh, I can't read that", description="Could you try another picture?")
|
||||
await event.context.respond(embed=embed)
|
||||
else:
|
||||
embed = hikari.Embed(
|
||||
title="Booking Error", description="Whelp, better luck next time I guess... the images used are attached.")
|
||||
embed.add_field("Traceback", traceback.format_exc())
|
||||
await event.context.respond(embed=embed, attachments=event.context.attachments)
|
||||
|
||||
|
||||
@plugin.command
|
||||
@lightbulb.command("rooms", "View today's rooms in Google Calendar.", ephemeral=True)
|
||||
@lightbulb.implements(lightbulb.SlashCommand)
|
||||
async def rooms(ctx: lightbulb.Context) -> None:
|
||||
gcal = GoogleCalendarAPI()
|
||||
events = await gcal.get_events()
|
||||
|
||||
room_list = [Room.from_event_dict(event) for event in events]
|
||||
embed = hikari.Embed(title="Today's Rooms!")
|
||||
if len(room_list) == 0:
|
||||
embed.description = "Looks like there's no rooms booked today. :("
|
||||
for room in room_list[::-1]:
|
||||
embed.add_field(
|
||||
f"{room.start_time.strftime('%H:%M')} - {room.end_time.strftime('%H:%M')}", room.number)
|
||||
await ctx.respond(embed)
|
||||
|
||||
|
||||
def load(bot: lightbulb.BotApp):
|
||||
bot.add_plugin(plugin)
|
||||
|
||||
|
||||
def unload(bot: lightbulb.BotApp):
|
||||
bot.remove_plugin(plugin)
|
128
ext/system.py
128
ext/system.py
|
@ -10,88 +10,76 @@ config = load_config()
|
|||
|
||||
plugin = lightbulb.Plugin("SystemPlugin")
|
||||
|
||||
async def create_subprocess(*args):
|
||||
proc = await asyncio.create_subprocess_exec(*args,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, _ = await proc.communicate()
|
||||
return stdout.decode("utf-8").strip()
|
||||
|
||||
async def get_git_latest_commit_id():
|
||||
return await create_subprocess("git", "rev-parse", "--short", "HEAD")
|
||||
|
||||
async def get_git_branch():
|
||||
return await create_subprocess("git", "rev-parse", "--abbrev-ref", "HEAD")
|
||||
|
||||
async def get_git_remote():
|
||||
return await create_subprocess("git", "branch", "-r")
|
||||
|
||||
async def get_git_head_diff_branch(branch: str) -> str:
|
||||
# diff HEAD to remote
|
||||
return await create_subprocess("git", "diff", "head", branch)
|
||||
|
||||
async def get_git_index_diff_branch(branch: str) -> str:
|
||||
# diff index to remote
|
||||
return await create_subprocess("git", "diff", branch)
|
||||
|
||||
async def get_git_commits_ahead_behind(branch: str, remote: str) -> tuple[int, int]:
|
||||
# commits ahead/behind remote
|
||||
output = await create_subprocess("git", "rev-list", "--left-right", "--count", branch+"..."+remote)
|
||||
commits_ahead = int(output.split()[0])
|
||||
commits_behind = int(output.split()[1])
|
||||
return commits_ahead, commits_behind
|
||||
|
||||
async def get_git_dev(remote: str, branch: str) -> bool:
|
||||
# dev environment tests
|
||||
if await get_git_head_diff_branch(remote) != "": # difference between HEAD and remote
|
||||
return True
|
||||
elif config.git_branch != branch: # branch has been changed
|
||||
return True
|
||||
elif await get_git_index_diff_branch(remote) != "": # difference between index and remote
|
||||
return True
|
||||
elif (await get_git_commits_ahead_behind(branch, remote))[0] > 0: # ahead in commits
|
||||
return True
|
||||
return False
|
||||
|
||||
async def get_git_update(dev_status: bool, branch, remote) -> bool:
|
||||
commits_behind, _ = await get_git_commits_ahead_behind(branch, remote)
|
||||
if not dev_status and commits_behind > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def get_git_status() -> dict:
|
||||
output = {}
|
||||
try:
|
||||
id_proc = await asyncio.create_subprocess_exec("git", "rev-parse", "--short", "HEAD",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, _ = await id_proc.communicate()
|
||||
output["commit_id"] = stdout.decode("utf-8")
|
||||
|
||||
br_proc = await asyncio.create_subprocess_exec("git", "rev-parse", "--abbrev-ref", "HEAD",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, _ = await br_proc.communicate()
|
||||
output["branch"] = stdout.decode("utf-8").strip()
|
||||
|
||||
rm_proc = await asyncio.create_subprocess_exec("git", "branch", "-r",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, _ = await rm_proc.communicate()
|
||||
output['remote'] = "remotes/"+stdout.decode("utf-8").lstrip().split("\n")[0]
|
||||
|
||||
# diff HEAD to remote
|
||||
hd_proc = await asyncio.create_subprocess_exec("git", "diff", "HEAD", output['remote'],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
# diff index to remote
|
||||
ix_proc = await asyncio.create_subprocess_exec("git", "diff", output['remote'],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, _ = await ix_proc.communicate()
|
||||
ix_diff = stdout.decode("utf-8")
|
||||
|
||||
# commits ahead/behind remote
|
||||
ab_proc = await asyncio.create_subprocess_exec("git", "rev-list", "--left-right", "--count", output['branch']+"..."+output["remote"],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, _ = await ab_proc.communicate()
|
||||
commits_ahead = stdout.decode("utf-8").split()[0]
|
||||
commits_behind = stdout.decode("utf-8").split()[1]
|
||||
|
||||
stdout, _ = await hd_proc.communicate()
|
||||
hd_diff = stdout.decode("utf-8")
|
||||
output['dev'] = False
|
||||
|
||||
# dev environment tests
|
||||
if hd_diff != "": # difference between HEAD and remote
|
||||
output['dev'] = True
|
||||
elif config.branch != output["branch"]: # branch has been changed
|
||||
output['dev'] = True
|
||||
elif ix_diff != "": # difference between index and remote
|
||||
output['dev'] = True
|
||||
elif int(commits_ahead) > 0: # ahead in commits
|
||||
output['dev'] = True
|
||||
|
||||
output['update'] = False
|
||||
if not output['dev'] and int(commits_behind) > 0:
|
||||
output['update'] = True
|
||||
except subprocess.SubprocessError as err:
|
||||
print(err)
|
||||
output["commit_id"] = await get_git_latest_commit_id()
|
||||
output["branch"] = await get_git_branch()
|
||||
output["remote"] = await get_git_remote()
|
||||
output["dev"] = await get_git_dev(output['remote'], output['branch'])
|
||||
output['update'] = await get_git_update(output['dev'], output['branch'], output['remote'])
|
||||
return output
|
||||
|
||||
@plugin.command
|
||||
@lightbulb.command("ping", "ping the bot.", ephemeral=True)
|
||||
@lightbulb.command("ping", "pong!", ephemeral=True)
|
||||
@lightbulb.implements(lightbulb.SlashCommand)
|
||||
async def ping(ctx: lightbulb.Context) -> None:
|
||||
embed = hikari.Embed(title="Ping!",
|
||||
embed = hikari.Embed(title="Pong!",
|
||||
description=f"latency: {round(ctx.bot.heartbeat_latency * 1000, ndigits=2)}ms"
|
||||
)
|
||||
await ctx.respond(embed)
|
||||
|
||||
@plugin.command
|
||||
@lightbulb.command("info", "get bot info", ephemeral=True)
|
||||
@lightbulb.command("info", "get bot information such as the version, and repository link.", ephemeral=True)
|
||||
@lightbulb.implements(lightbulb.SlashCommand)
|
||||
async def info(ctx: lightbulb.Context) -> None:
|
||||
git_status = await get_git_status()
|
||||
|
|
|
@ -6,16 +6,22 @@ from dataclasses import dataclass
|
|||
class Config(object):
|
||||
discord_token: str | None = None
|
||||
discord_guild_id: int | None = None
|
||||
discord_study_room_thread: int | None = None
|
||||
git_branch: str | None = None
|
||||
git_url: str | None = None
|
||||
google_calendar_id: str | None = None
|
||||
time_zone: str | None = None
|
||||
|
||||
def config_decoder(obj):
|
||||
if '__type__' in obj and obj['__type__'] == "Config":
|
||||
return Config(
|
||||
discord_token=obj['discord_token'],
|
||||
discord_guild_id=obj['discord_guild_id'],
|
||||
discord_study_room_thread=obj['discord_study_room_thread'],
|
||||
git_branch=obj['git_branch'],
|
||||
git_url=obj['git_url']
|
||||
git_url=obj['git_url'],
|
||||
google_calendar_id=obj['google_calendar_id'],
|
||||
time_zone=obj['time_zone']
|
||||
)
|
||||
|
||||
def save_config(config: dict):
|
||||
|
|
62
lib/gcal.py
Normal file
62
lib/gcal.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, time
|
||||
from pathlib import Path
|
||||
import json
|
||||
import pytz
|
||||
|
||||
from aiogoogle import Aiogoogle
|
||||
|
||||
from lib.config import load_config
|
||||
|
||||
config = load_config()
|
||||
|
||||
class GoogleCalendarAPI():
|
||||
def __init__(self, creds_path: Path = Path("token.json"), calendar_id = config.google_calendar_id):
|
||||
self._creds_path = creds_path
|
||||
self._calendar_id = calendar_id
|
||||
|
||||
def get_user_creds(self) -> dict:
|
||||
with Path(self._creds_path).open("r", encoding="utf-8") as fp:
|
||||
creds = json.load(fp)
|
||||
output = {"access_token": creds['token'], 'refresh_token': creds['refresh_token']}
|
||||
return output
|
||||
|
||||
def get_client_creds(self) -> dict:
|
||||
"""Taken from an Installed App credentials file"""
|
||||
with Path(self._creds_path).open("r", encoding="utf-8") as fp:
|
||||
creds = json.load(fp)
|
||||
output = {
|
||||
'client_id': creds['client_id'],
|
||||
'client_secret': creds['client_secret'],
|
||||
'scopes': creds['scopes']
|
||||
}
|
||||
return output
|
||||
|
||||
async def get_event(self, event_id) -> dict:
|
||||
async with Aiogoogle(user_creds=self.get_user_creds(), client_creds=self.get_client_creds()) as aiogoogle:
|
||||
api = await aiogoogle.discover("calendar", "v3")
|
||||
result = await aiogoogle.as_user(api.events.get(calendarId=self._calendar_id, eventId=event_id))
|
||||
return result
|
||||
|
||||
async def get_events(self,
|
||||
start_time = datetime.combine(datetime.now(), time.min, tzinfo=pytz.timezone(config.time_zone)).isoformat(),
|
||||
end_time = datetime.combine(datetime.now(), time.max, tzinfo=pytz.timezone(config.time_zone)).isoformat()
|
||||
):
|
||||
async with Aiogoogle(user_creds=self.get_user_creds(), client_creds=self.get_client_creds()) as aiogoogle:
|
||||
api = await aiogoogle.discover("calendar", "v3")
|
||||
result = await aiogoogle.as_user(api.events.list(calendarId = self._calendar_id, timeMin = start_time, timeMax = end_time))
|
||||
return result['items']
|
||||
|
||||
async def create_event(self, name: str, start_time: datetime, end_time: datetime):
|
||||
async with Aiogoogle(user_creds=self.get_user_creds(), client_creds=self.get_client_creds()) as aiogoogle:
|
||||
api = await aiogoogle.discover("calendar", "v3")
|
||||
event = {
|
||||
"summary": "",
|
||||
"start": {},
|
||||
"end": {}
|
||||
}
|
||||
event['summary'] = name
|
||||
event['start']['dateTime'] = start_time.isoformat().strip()
|
||||
event['end']['dateTime'] = end_time.isoformat().strip()
|
||||
await aiogoogle.as_user(api.events.insert(calendarId=self._calendar_id, json=event))
|
87
lib/ocr.py
Normal file
87
lib/ocr.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
import re
|
||||
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
import pytz
|
||||
|
||||
from pytesseract import image_to_string
|
||||
from PIL import Image
|
||||
|
||||
from lib.config import load_config
|
||||
from lib.room import Room
|
||||
|
||||
__all__ = ["get_room_data"]
|
||||
|
||||
config = load_config()
|
||||
|
||||
RE_STRING = re.compile( # https://regex101.com/r/ELsqrO/1
|
||||
r"(L-[0-9]{4}): " # room number (group 1)
|
||||
# time-slot (group 2,3,4 - group 5,6,7)
|
||||
r"([1-12]{1,2}):([0,3]{2})(am|pm) - ([1-12]{1,2}):([0,3]{2})(am|pm), "
|
||||
# weekday (group 8)
|
||||
r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday), "
|
||||
# month (group 9)
|
||||
r"(January|February|March|April|May|June|July|August|September|October|November|December) "
|
||||
r"([1-9][0-9]|[1-9]), ([0-9]{4})", # day & year (group 10 & 11)
|
||||
flags=re.M
|
||||
)
|
||||
|
||||
|
||||
class NotAMatchException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoMatchException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def add_zero_padding(value: str):
|
||||
"""Adds zero-padding to a single digit value."""
|
||||
if int(value) < 10:
|
||||
return "0"+str(int(value)) # this removes leading 0 if already present
|
||||
return value
|
||||
|
||||
|
||||
def libcal_to_datetime(year: str, month: str, day: str, hour: str, minute: str, am_pm: str) -> datetime:
|
||||
"""Takes date information as displayed by LibCal and turns it into a datetime object.
|
||||
All values should be given in an unmodified string format."""
|
||||
date = datetime.strptime(f"{year}-{month}-{add_zero_padding(day)}-"
|
||||
f"{add_zero_padding(hour)}-{add_zero_padding(minute)}-{am_pm}",
|
||||
"%Y-%B-%d-%I-%M-%p")
|
||||
tz = pytz.timezone(config.time_zone)
|
||||
return tz.localize(date) # adds timezone info to object
|
||||
|
||||
|
||||
def correct_commas(string: str):
|
||||
"""Ensures all commas have a space after them in the given string."""
|
||||
return re.sub(r"(,)([^ ])", r"\1 \2", string)
|
||||
|
||||
|
||||
def correct_newlines(string: str):
|
||||
"""Replaces all newlines with a space in the given string."""
|
||||
return string.replace("\n", " ")
|
||||
|
||||
|
||||
def get_room_data(img: bytes) -> list[Room]:
|
||||
"""Gets the room data from a given image"""
|
||||
rooms: list[Room] = []
|
||||
start_time: datetime | None = None
|
||||
end_time: datetime | None = None
|
||||
img_string = image_to_string(Image.open(BytesIO(img)))
|
||||
img_string = correct_newlines(img_string)
|
||||
img_string = correct_commas(img_string)
|
||||
matches = re.finditer(RE_STRING, img_string)
|
||||
for match in matches:
|
||||
if isinstance(match, re.Match):
|
||||
room_number = match.group(1)
|
||||
start_time = libcal_to_datetime(match.group(11), match.group(
|
||||
9), match.group(10), match.group(2), match.group(3), match.group(4))
|
||||
end_time = libcal_to_datetime(match.group(11), match.group(
|
||||
9), match.group(10), match.group(5), match.group(6), match.group(7))
|
||||
room = Room(room_number, start_time, end_time)
|
||||
rooms.append(room)
|
||||
else:
|
||||
raise NotAMatchException(match)
|
||||
if len(rooms) == 0:
|
||||
raise NoMatchException
|
||||
return rooms
|
21
lib/room.py
Normal file
21
lib/room.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
from datetime import datetime
|
||||
import dateutil.parser as parser
|
||||
|
||||
|
||||
class Room:
|
||||
def __init__(self, number: str, start_time: datetime, end_time: datetime):
|
||||
self.number: str = number
|
||||
self.start_time: datetime = start_time
|
||||
self.end_time: datetime = end_time
|
||||
|
||||
@classmethod
|
||||
def from_event_dict(cls, body: dict):
|
||||
start_time = parser.parse(body['start']['dateTime'])
|
||||
end_time = parser.parse(body['end']['dateTime'])
|
||||
return cls(body['summary'], start_time, end_time)
|
||||
|
||||
def get_time_str(self) -> str:
|
||||
date = self.start_time.strftime("%B %d")
|
||||
start = self.start_time.strftime("%H:%M")
|
||||
end = self.end_time.strftime("%H:%M")
|
||||
return f"{date}, {start} - {end}"
|
2
main.py
2
main.py
|
@ -1,4 +1,5 @@
|
|||
import lightbulb
|
||||
import miru
|
||||
|
||||
from lib.config import load_config
|
||||
|
||||
|
@ -6,5 +7,6 @@ config = load_config()
|
|||
|
||||
bot = lightbulb.BotApp(config.discord_token, default_enabled_guilds=[config.discord_guild_id])
|
||||
bot.load_extensions_from("ext/")
|
||||
miru.load(bot)
|
||||
|
||||
bot.run()
|
||||
|
|
Loading…
Reference in a new issue