diff --git a/api.py b/api.py index e3d84b9..1358186 100644 --- a/api.py +++ b/api.py @@ -1,6 +1,5 @@ import importlib import logging -import platform from pathlib import Path from fastapi import FastAPI @@ -20,10 +19,7 @@ def load_module(name: str): for m_path in Path("routers").glob("*"): if "__pycache__" in m_path.parts or "__init__" == m_path.stem: continue - if platform.system() != "Windows": - load_module(str(m_path).replace("/", ".").replace(".py", "")) - else: - load_module(str(m_path).replace("\\", ".").replace(".py", "")) + load_module(str(m_path).replace("/", ".").replace(".py", "")) @api.get("/") diff --git a/core/storage.py b/core/storage.py deleted file mode 100644 index 6f4fe01..0000000 --- a/core/storage.py +++ /dev/null @@ -1,35 +0,0 @@ -import json - -from pathlib import Path - -class Storage: - """Handles all filesystem interactions for the API""" - def __init__(self, router: str) -> None: - self.root = Path.home().joinpath(".radical_api").joinpath(router) - self.root.mkdir(exist_ok=True, parents=True) - - def _get_file(self, path: str) -> Path: - return self.root.joinpath(path) - - def write_file(self, name: str, data: dict): - file = self._get_file(name) - file.touch(exist_ok=True) - with file.open("w+") as f: - json.dump(data, f, indent=4, sort_keys=True) - - def read_file(self, name: str) -> dict: - file = self._get_file(name) - if not file.exists(): - return {} - with file.open("r+") as f: - data = json.load(f) - return data - - def delete_file(self, name: str): - file = self._get_file(name) - file.unlink(missing_ok=True) - - def rename_file(self, old_name: str, new_name: str): - file = self._get_file(old_name) - new_file = self.root.joinpath(new_name) - file.rename(new_file) \ No newline at end of file diff --git a/core/tracker.py b/core/tracker.py index b832b39..962b716 100644 --- a/core/tracker.py +++ b/core/tracker.py @@ -1,10 +1,10 @@ import json from datetime import datetime, timedelta from fastapi.encoders import jsonable_encoder +from pathlib import Path -from core.storage import Storage +DATA_DIR = Path.home().joinpath(".radical-api/trackers") -storage = Storage("trackers") def object_hook(dct): if "datetime" in dct: @@ -28,13 +28,24 @@ class Tracker: def rename(self, name): """Rename the Tracker.""" - storage.rename_file(f"{self.name}.json", f"{name}.json") + DATA_DIR.joinpath(f"{self.name}.json").rename( + DATA_DIR.joinpath(f"{name}.json")) self.name = name - storage.write_file(f"{self.name}.json", jsonable_encoder(self)) + self.save() def delete(self): """Delete the Tracker.""" - storage.delete_file(f"{self.name}.json") + filepath = DATA_DIR.joinpath(f"{self.name}.json") + filepath.unlink() + + def save(self): + """Save the tracker to JSON.""" + DATA_DIR.mkdir(exist_ok=True) + filepath = DATA_DIR.joinpath(f"{self.name}.json") + filepath.touch(exist_ok=True) + with filepath.open("w+") as fp: + data = jsonable_encoder(self) + json.dump(data, fp, indent=4) def modify_point(self, date_str: str, value: int): """Modify a point. Change its assigned value to the one given.""" @@ -43,13 +54,13 @@ class Tracker: if point.datetime == date_time: point.value = value break - storage.write_file(f"{self.name}.json", jsonable_encoder(self)) + self.save() def add_point(self, date_str: str, value: int): """Add a point to the tracker.""" point = TrackerPoint(date_str, value) self.points.append(point) - storage.write_file(f"{self.name}.json", jsonable_encoder(self)) + self.save() def delete_point(self, date_str: str): """Remove a point from the tracker.""" @@ -58,7 +69,7 @@ class Tracker: if point.datetime == date_time: self.points.remove(point) break - storage.write_file(f"{self.name}.json", jsonable_encoder(self)) + self.save() def list_points(self, start_date: datetime = None, end_date: datetime = None) -> list: """ @@ -89,10 +100,10 @@ class Tracker: @classmethod def from_data(cls, name): """Load a tracker from the DATA_DIR.""" - data = storage.read_file(f"{name}.json") - return cls.from_json(data) + filepath = DATA_DIR.joinpath(f"{name}.json") + return cls.from_json(filepath.read_text()) @staticmethod def from_json(json_str): """Load a tracker from a JSON string.""" - return json.load(json_str, object_hook=object_hook) + return json.loads(json_str, object_hook=object_hook) diff --git a/routers/gitea.py b/routers/gitea.py index 5d5217b..4caed8e 100644 --- a/routers/gitea.py +++ b/routers/gitea.py @@ -1,19 +1,20 @@ import logging from typing import Optional import giteapy +import json +from pathlib import Path from fastapi import APIRouter, Header, Query, HTTPException -from core.storage import Storage - -storage = Storage("gitea") +ROOT_PATH = Path.home().joinpath(".radical_api/gitea") logger = logging.getLogger("gitea") router = APIRouter() @router.get("/gitea/latest") def get_latest_version(base_url: Optional[str] = Header(None), author: str = Query(None), repo: str = Query(None), pre_releases: bool = Query(False)): - data = storage.read_file("keys.json") + with ROOT_PATH.joinpath("keys.json").open("r") as fp: + data: dict = json.load(fp) if base_url in data.keys(): config = giteapy.Configuration() config.api_key['access_token'] = data[base_url] diff --git a/routers/lights.py b/routers/lights.py deleted file mode 100644 index 470094b..0000000 --- a/routers/lights.py +++ /dev/null @@ -1,57 +0,0 @@ -import logging - -import pywizlight -from pywizlight.scenes import get_id_from_scene_name, SCENES - -from fastapi import APIRouter, Depends -from fastapi.security.api_key import APIKey - -from core.storage import Storage - -from .security import get_api_key - -logger = logging.getLogger("lights") -router = APIRouter(prefix="/lights", tags=["lights"]) - -storage = Storage("lights") - -async def get_lights(access_token: APIKey = Depends(get_api_key), target: str = None) -> list[pywizlight.wizlight]: - try: - bulb_ips = storage.read_file("lights.json") - bulbs = [] - for ip in bulb_ips: - if target is not None and ip != target: - continue - bulbs.append(pywizlight.wizlight(ip)) - if len(bulbs) == 0: - raise ValueError - except (FileNotFoundError, ValueError): - storage.write_file("lights.json", await scan_lights(access_token)) - return bulbs - -@router.get("/scan") -async def scan_lights(access_token: APIKey = Depends(get_api_key)) -> list[str]: - bulbs = await pywizlight.discovery.discover_lights() - bulb_ips = [] - for bulb in bulbs: - bulb_ips.append(bulb.ip) - return bulb_ips - -# make this rescan if light is not found on network - -@router.get("/on") -async def lights_on(scene: str = None, target: str = None, access_token: APIKey = Depends(get_api_key)) -> None: - for light in await get_lights(access_token, target): - if scene is not None: - await light.turn_on(pywizlight.PilotBuilder(scene = get_id_from_scene_name(scene))) - else: - await light.turn_on() - -@router.get("/off") -async def lights_off(target: str, access_token: APIKey = Depends(get_api_key)) -> None: - for light in await get_lights(access_token, target): - await light.turn_off() - -@router.get("/scenes") -async def lights_scenes(access_token: APIKey = Depends(get_api_key)) -> None: - return SCENES diff --git a/routers/mc_optimizer.py b/routers/mc_optimizer.py new file mode 100644 index 0000000..e3d3b88 --- /dev/null +++ b/routers/mc_optimizer.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from fastapi import APIRouter, HTTPException, Query +from fastapi.responses import StreamingResponse +from pathlib import Path +from datetime import datetime +from zipfile import ZipFile +import json + +from starlette.responses import FileResponse + + +router = APIRouter(prefix="/mc_optimizer") +ROOT_PATH = Path.home().joinpath(".radical_api/mc_optimizer") +ROOT_PATH.mkdir(exist_ok=True) + +class PackFile: + def __init__(self, file: Path) -> None: + self.file = file + self.manifest = self.set_manifest() + self.datetime = datetime.strptime(self.manifest['build-date'], "%d-%m-%Y-%H%M%S") + self.build = self.manifest['build'] + + def set_manifest(self): + zf = ZipFile(self.file) + return json.loads(zf.read("manifest.json").decode("utf-8")) + + def __lt__(self, other: PackFile): + return self.datetime.__lt__(other.datetime) + +def iterfile(path: Path): + with path.open('rb') as fp: + yield from fp + +@router.get("/download/latest") +def latest_pack(): + files = [] + for file in ROOT_PATH.iterdir(): + files.append(PackFile(file)) + files.sort() + if len(files) == 0: + raise HTTPException(404, "No files found.") + return FileResponse(files[-1].file, filename=f"mc-optimizer-pack-{files[-1].build}.zip") + +@router.get("/download/build") +def get_build(build: int): + for file in ROOT_PATH.iterdir(): + pf = PackFile(file) + if pf.build == build: + return FileResponse(pf.file, filename=f"mc-optimizer-pack-{pf.build}.zip") + raise HTTPException(404, "Build not found.") + +@router.get("/manifest/latest") +def get_latest_manifest(): + files = [] + for file in ROOT_PATH.iterdir(): + files.append(PackFile(file)) + files.sort() + if len(files) == 0: + raise HTTPException(404, "No files found.") + return files[-1].manifest + +@router.get("/manifest/builds") +def get_builds(): + builds = [] + for file in ROOT_PATH.iterdir(): + pf = PackFile(file) + builds.append(pf.manifest) + return builds \ No newline at end of file diff --git a/routers/security.py b/routers/security.py index 9d2d28d..4e048a0 100644 --- a/routers/security.py +++ b/routers/security.py @@ -3,14 +3,14 @@ from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, AP from starlette.status import HTTP_403_FORBIDDEN +from pathlib import Path import logging import uuid - -from core.storage import Storage +import json router = APIRouter() logger = logging.getLogger("security") -storage = Storage("security") +ROOT_PATH = Path.home().joinpath(".radical_api/keys") API_KEY_NAME = "access_token" @@ -18,17 +18,24 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False) api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False) -try: - system_key = storage.read_file("system_key.json")["system_key"] -except FileNotFoundError: +system_key_path = ROOT_PATH.joinpath("system_key") +if not system_key_path.exists(): logger.info("System Key not found, generating.") + system_key_path.parent.mkdir(parents=True, exist_ok=True) + system_key_path.touch() system_uuid = str(uuid.uuid4()) - storage.write_file("system_key.json", {"system_key": system_uuid}) + system_key_path.write_text(system_uuid) logger.info(f"System UUID: {system_uuid}") def load_keys(): - return storage.read_file("api_keys.json") + path = ROOT_PATH.joinpath("api_keys.json") + if not path.exists(): + path.touch() + path.write_text("{}") + with path.open("r") as f: + data = json.load(f) + return data def get_system_key( @@ -36,12 +43,12 @@ def get_system_key( api_key_header: str = Security(api_key_header), api_key_cookie: str = Security(api_key_cookie) ): - system_key - if api_key_query == system_key: + path = ROOT_PATH.joinpath("system_key") + if api_key_query == path.read_text(): return api_key_query - elif api_key_header == system_key: + elif api_key_header == path.read_text(): return api_key_header - elif api_key_cookie == system_key: + elif api_key_cookie == path.read_text(): return api_key_cookie else: raise HTTPException( @@ -68,11 +75,13 @@ def get_api_key( @router.get("/key/create") def create_key(name, system_key: APIKey = Depends(get_system_key)): - keys: dict = storage.read_file("api_keys.json") + keys: dict = load_keys() new_key = str(uuid.uuid4()) keys[new_key] = {} keys[new_key]["name"] = name - storage.write_file("api_keys.json", keys) + path = ROOT_PATH.joinpath("api_keys.json") + with path.open("w") as f: + json.dump(keys, f, indent=4, sort_keys=True) return new_key