From 523baf7b573d49eb73480b194f218854c1048c58 Mon Sep 17 00:00:00 2001 From: Riley Housden Date: Sat, 12 Mar 2022 23:50:33 -0500 Subject: [PATCH] Improve storage management - Unified all storage access to a single class - Implemented new storage access to all routers in the API. - Fixes #1 --- core/storage.py | 35 +++++++++++++++++++++++++++++ core/tracker.py | 33 +++++++++------------------ routers/gitea.py | 9 ++++---- routers/lights.py | 55 ++++++++++++++++++++++++++++----------------- routers/security.py | 37 ++++++++++++------------------ 5 files changed, 98 insertions(+), 71 deletions(-) create mode 100644 core/storage.py diff --git a/core/storage.py b/core/storage.py new file mode 100644 index 0000000..6f4fe01 --- /dev/null +++ b/core/storage.py @@ -0,0 +1,35 @@ +import json + +from pathlib import Path + +class Storage: + """Handles all filesystem interactions for the API""" + def __init__(self, router: str) -> None: + self.root = Path.home().joinpath(".radical_api").joinpath(router) + self.root.mkdir(exist_ok=True, parents=True) + + def _get_file(self, path: str) -> Path: + return self.root.joinpath(path) + + def write_file(self, name: str, data: dict): + file = self._get_file(name) + file.touch(exist_ok=True) + with file.open("w+") as f: + json.dump(data, f, indent=4, sort_keys=True) + + def read_file(self, name: str) -> dict: + file = self._get_file(name) + if not file.exists(): + return {} + with file.open("r+") as f: + data = json.load(f) + return data + + def delete_file(self, name: str): + file = self._get_file(name) + file.unlink(missing_ok=True) + + def rename_file(self, old_name: str, new_name: str): + file = self._get_file(old_name) + new_file = self.root.joinpath(new_name) + file.rename(new_file) \ No newline at end of file diff --git a/core/tracker.py b/core/tracker.py index 962b716..b832b39 100644 --- a/core/tracker.py +++ b/core/tracker.py @@ -1,10 +1,10 @@ import json from datetime import datetime, timedelta from fastapi.encoders import jsonable_encoder -from pathlib import Path -DATA_DIR = Path.home().joinpath(".radical-api/trackers") +from core.storage import Storage +storage = Storage("trackers") def object_hook(dct): if "datetime" in dct: @@ -28,24 +28,13 @@ class Tracker: def rename(self, name): """Rename the Tracker.""" - DATA_DIR.joinpath(f"{self.name}.json").rename( - DATA_DIR.joinpath(f"{name}.json")) + storage.rename_file(f"{self.name}.json", f"{name}.json") self.name = name - self.save() + storage.write_file(f"{self.name}.json", jsonable_encoder(self)) def delete(self): """Delete the Tracker.""" - filepath = DATA_DIR.joinpath(f"{self.name}.json") - filepath.unlink() - - def save(self): - """Save the tracker to JSON.""" - DATA_DIR.mkdir(exist_ok=True) - filepath = DATA_DIR.joinpath(f"{self.name}.json") - filepath.touch(exist_ok=True) - with filepath.open("w+") as fp: - data = jsonable_encoder(self) - json.dump(data, fp, indent=4) + storage.delete_file(f"{self.name}.json") def modify_point(self, date_str: str, value: int): """Modify a point. Change its assigned value to the one given.""" @@ -54,13 +43,13 @@ class Tracker: if point.datetime == date_time: point.value = value break - self.save() + storage.write_file(f"{self.name}.json", jsonable_encoder(self)) def add_point(self, date_str: str, value: int): """Add a point to the tracker.""" point = TrackerPoint(date_str, value) self.points.append(point) - self.save() + storage.write_file(f"{self.name}.json", jsonable_encoder(self)) def delete_point(self, date_str: str): """Remove a point from the tracker.""" @@ -69,7 +58,7 @@ class Tracker: if point.datetime == date_time: self.points.remove(point) break - self.save() + storage.write_file(f"{self.name}.json", jsonable_encoder(self)) def list_points(self, start_date: datetime = None, end_date: datetime = None) -> list: """ @@ -100,10 +89,10 @@ class Tracker: @classmethod def from_data(cls, name): """Load a tracker from the DATA_DIR.""" - filepath = DATA_DIR.joinpath(f"{name}.json") - return cls.from_json(filepath.read_text()) + data = storage.read_file(f"{name}.json") + return cls.from_json(data) @staticmethod def from_json(json_str): """Load a tracker from a JSON string.""" - return json.loads(json_str, object_hook=object_hook) + return json.load(json_str, object_hook=object_hook) diff --git a/routers/gitea.py b/routers/gitea.py index 4caed8e..5d5217b 100644 --- a/routers/gitea.py +++ b/routers/gitea.py @@ -1,20 +1,19 @@ import logging from typing import Optional import giteapy -import json -from pathlib import Path from fastapi import APIRouter, Header, Query, HTTPException -ROOT_PATH = Path.home().joinpath(".radical_api/gitea") +from core.storage import Storage + +storage = Storage("gitea") logger = logging.getLogger("gitea") router = APIRouter() @router.get("/gitea/latest") def get_latest_version(base_url: Optional[str] = Header(None), author: str = Query(None), repo: str = Query(None), pre_releases: bool = Query(False)): - with ROOT_PATH.joinpath("keys.json").open("r") as fp: - data: dict = json.load(fp) + data = storage.read_file("keys.json") if base_url in data.keys(): config = giteapy.Configuration() config.api_key['access_token'] = data[base_url] diff --git a/routers/lights.py b/routers/lights.py index 5741046..470094b 100644 --- a/routers/lights.py +++ b/routers/lights.py @@ -1,44 +1,57 @@ import logging -from pathlib import Path import pywizlight -from pywizlight.scenes import get_id_from_scene_name +from pywizlight.scenes import get_id_from_scene_name, SCENES from fastapi import APIRouter, Depends from fastapi.security.api_key import APIKey +from core.storage import Storage + from .security import get_api_key logger = logging.getLogger("lights") router = APIRouter(prefix="/lights", tags=["lights"]) -# TODO: improve resource locating for API -ROOT_PATH = Path.home().joinpath(".radical_api/lights") -ROOT_PATH.mkdir(parents=True, exist_ok=True) +storage = Storage("lights") -# make this rescan if light is not found on network -@router.get("/scan") -async def get_lights(force: bool = False, access_token: APIKey = Depends(get_api_key)) -> list[pywizlight.wizlight]: - if not ROOT_PATH.joinpath("lights.json").exists() or force: - bulbs = await pywizlight.discovery.discover_lights() - with ROOT_PATH.joinpath("lights.json").open("w+") as f: - for bulb in bulbs: - f.write(f"{bulb.ip}\n") - else: +async def get_lights(access_token: APIKey = Depends(get_api_key), target: str = None) -> list[pywizlight.wizlight]: + try: + bulb_ips = storage.read_file("lights.json") bulbs = [] - for line in ROOT_PATH.joinpath("lights.json").read_text().split(): - bulbs.append(pywizlight.wizlight(line)) + for ip in bulb_ips: + if target is not None and ip != target: + continue + bulbs.append(pywizlight.wizlight(ip)) + if len(bulbs) == 0: + raise ValueError + except (FileNotFoundError, ValueError): + storage.write_file("lights.json", await scan_lights(access_token)) return bulbs +@router.get("/scan") +async def scan_lights(access_token: APIKey = Depends(get_api_key)) -> list[str]: + bulbs = await pywizlight.discovery.discover_lights() + bulb_ips = [] + for bulb in bulbs: + bulb_ips.append(bulb.ip) + return bulb_ips + +# make this rescan if light is not found on network + @router.get("/on") -async def lights_on(scene: str = None, access_token: APIKey = Depends(get_api_key)) -> None: - for light in await get_lights(access_token): +async def lights_on(scene: str = None, target: str = None, access_token: APIKey = Depends(get_api_key)) -> None: + for light in await get_lights(access_token, target): if scene is not None: await light.turn_on(pywizlight.PilotBuilder(scene = get_id_from_scene_name(scene))) else: await light.turn_on() @router.get("/off") -async def lights_off(access_token: APIKey = Depends(get_api_key)) -> None: - for light in await get_lights(access_token): - await light.turn_off() \ No newline at end of file +async def lights_off(target: str, access_token: APIKey = Depends(get_api_key)) -> None: + for light in await get_lights(access_token, target): + await light.turn_off() + +@router.get("/scenes") +async def lights_scenes(access_token: APIKey = Depends(get_api_key)) -> None: + return SCENES diff --git a/routers/security.py b/routers/security.py index 4e048a0..9d2d28d 100644 --- a/routers/security.py +++ b/routers/security.py @@ -3,14 +3,14 @@ from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, AP from starlette.status import HTTP_403_FORBIDDEN -from pathlib import Path import logging import uuid -import json + +from core.storage import Storage router = APIRouter() logger = logging.getLogger("security") -ROOT_PATH = Path.home().joinpath(".radical_api/keys") +storage = Storage("security") API_KEY_NAME = "access_token" @@ -18,24 +18,17 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False) api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False) -system_key_path = ROOT_PATH.joinpath("system_key") -if not system_key_path.exists(): +try: + system_key = storage.read_file("system_key.json")["system_key"] +except FileNotFoundError: logger.info("System Key not found, generating.") - system_key_path.parent.mkdir(parents=True, exist_ok=True) - system_key_path.touch() system_uuid = str(uuid.uuid4()) - system_key_path.write_text(system_uuid) + storage.write_file("system_key.json", {"system_key": system_uuid}) logger.info(f"System UUID: {system_uuid}") def load_keys(): - path = ROOT_PATH.joinpath("api_keys.json") - if not path.exists(): - path.touch() - path.write_text("{}") - with path.open("r") as f: - data = json.load(f) - return data + return storage.read_file("api_keys.json") def get_system_key( @@ -43,12 +36,12 @@ def get_system_key( api_key_header: str = Security(api_key_header), api_key_cookie: str = Security(api_key_cookie) ): - path = ROOT_PATH.joinpath("system_key") - if api_key_query == path.read_text(): + system_key + if api_key_query == system_key: return api_key_query - elif api_key_header == path.read_text(): + elif api_key_header == system_key: return api_key_header - elif api_key_cookie == path.read_text(): + elif api_key_cookie == system_key: return api_key_cookie else: raise HTTPException( @@ -75,13 +68,11 @@ def get_api_key( @router.get("/key/create") def create_key(name, system_key: APIKey = Depends(get_system_key)): - keys: dict = load_keys() + keys: dict = storage.read_file("api_keys.json") new_key = str(uuid.uuid4()) keys[new_key] = {} keys[new_key]["name"] = name - path = ROOT_PATH.joinpath("api_keys.json") - with path.open("w") as f: - json.dump(keys, f, indent=4, sort_keys=True) + storage.write_file("api_keys.json", keys) return new_key