Compare commits

...

4 commits

Author SHA1 Message Date
523baf7b57
Improve storage management
- Unified all storage access to a single class
- Implemented new storage access to all routers in the API.
- Fixes #1
2022-03-12 23:50:33 -05:00
d001c10efd
Remove MC optimizer router 2022-03-11 11:07:52 -05:00
7c89b692df
Add lights router to control WiZ lights 2022-03-11 11:07:33 -05:00
6ebfc7d3fb
Fix router loading on Windows systems 2022-03-11 11:06:55 -05:00
7 changed files with 126 additions and 120 deletions

6
api.py
View file

@ -1,5 +1,6 @@
import importlib
import logging
import platform
from pathlib import Path
from fastapi import FastAPI
@ -19,7 +20,10 @@ def load_module(name: str):
for m_path in Path("routers").glob("*"):
if "__pycache__" in m_path.parts or "__init__" == m_path.stem:
continue
load_module(str(m_path).replace("/", ".").replace(".py", ""))
if platform.system() != "Windows":
load_module(str(m_path).replace("/", ".").replace(".py", ""))
else:
load_module(str(m_path).replace("\\", ".").replace(".py", ""))
@api.get("/")

35
core/storage.py Normal file
View file

@ -0,0 +1,35 @@
import json
from pathlib import Path
class Storage:
"""Handles all filesystem interactions for the API"""
def __init__(self, router: str) -> None:
self.root = Path.home().joinpath(".radical_api").joinpath(router)
self.root.mkdir(exist_ok=True, parents=True)
def _get_file(self, path: str) -> Path:
return self.root.joinpath(path)
def write_file(self, name: str, data: dict):
file = self._get_file(name)
file.touch(exist_ok=True)
with file.open("w+") as f:
json.dump(data, f, indent=4, sort_keys=True)
def read_file(self, name: str) -> dict:
file = self._get_file(name)
if not file.exists():
return {}
with file.open("r+") as f:
data = json.load(f)
return data
def delete_file(self, name: str):
file = self._get_file(name)
file.unlink(missing_ok=True)
def rename_file(self, old_name: str, new_name: str):
file = self._get_file(old_name)
new_file = self.root.joinpath(new_name)
file.rename(new_file)

View file

@ -1,10 +1,10 @@
import json
from datetime import datetime, timedelta
from fastapi.encoders import jsonable_encoder
from pathlib import Path
DATA_DIR = Path.home().joinpath(".radical-api/trackers")
from core.storage import Storage
storage = Storage("trackers")
def object_hook(dct):
if "datetime" in dct:
@ -28,24 +28,13 @@ class Tracker:
def rename(self, name):
"""Rename the Tracker."""
DATA_DIR.joinpath(f"{self.name}.json").rename(
DATA_DIR.joinpath(f"{name}.json"))
storage.rename_file(f"{self.name}.json", f"{name}.json")
self.name = name
self.save()
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
def delete(self):
"""Delete the Tracker."""
filepath = DATA_DIR.joinpath(f"{self.name}.json")
filepath.unlink()
def save(self):
"""Save the tracker to JSON."""
DATA_DIR.mkdir(exist_ok=True)
filepath = DATA_DIR.joinpath(f"{self.name}.json")
filepath.touch(exist_ok=True)
with filepath.open("w+") as fp:
data = jsonable_encoder(self)
json.dump(data, fp, indent=4)
storage.delete_file(f"{self.name}.json")
def modify_point(self, date_str: str, value: int):
"""Modify a point. Change its assigned value to the one given."""
@ -54,13 +43,13 @@ class Tracker:
if point.datetime == date_time:
point.value = value
break
self.save()
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
def add_point(self, date_str: str, value: int):
"""Add a point to the tracker."""
point = TrackerPoint(date_str, value)
self.points.append(point)
self.save()
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
def delete_point(self, date_str: str):
"""Remove a point from the tracker."""
@ -69,7 +58,7 @@ class Tracker:
if point.datetime == date_time:
self.points.remove(point)
break
self.save()
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
def list_points(self, start_date: datetime = None, end_date: datetime = None) -> list:
"""
@ -100,10 +89,10 @@ class Tracker:
@classmethod
def from_data(cls, name):
"""Load a tracker from the DATA_DIR."""
filepath = DATA_DIR.joinpath(f"{name}.json")
return cls.from_json(filepath.read_text())
data = storage.read_file(f"{name}.json")
return cls.from_json(data)
@staticmethod
def from_json(json_str):
"""Load a tracker from a JSON string."""
return json.loads(json_str, object_hook=object_hook)
return json.load(json_str, object_hook=object_hook)

View file

@ -1,20 +1,19 @@
import logging
from typing import Optional
import giteapy
import json
from pathlib import Path
from fastapi import APIRouter, Header, Query, HTTPException
ROOT_PATH = Path.home().joinpath(".radical_api/gitea")
from core.storage import Storage
storage = Storage("gitea")
logger = logging.getLogger("gitea")
router = APIRouter()
@router.get("/gitea/latest")
def get_latest_version(base_url: Optional[str] = Header(None), author: str = Query(None), repo: str = Query(None), pre_releases: bool = Query(False)):
with ROOT_PATH.joinpath("keys.json").open("r") as fp:
data: dict = json.load(fp)
data = storage.read_file("keys.json")
if base_url in data.keys():
config = giteapy.Configuration()
config.api_key['access_token'] = data[base_url]

57
routers/lights.py Normal file
View file

@ -0,0 +1,57 @@
import logging
import pywizlight
from pywizlight.scenes import get_id_from_scene_name, SCENES
from fastapi import APIRouter, Depends
from fastapi.security.api_key import APIKey
from core.storage import Storage
from .security import get_api_key
logger = logging.getLogger("lights")
router = APIRouter(prefix="/lights", tags=["lights"])
storage = Storage("lights")
async def get_lights(access_token: APIKey = Depends(get_api_key), target: str = None) -> list[pywizlight.wizlight]:
try:
bulb_ips = storage.read_file("lights.json")
bulbs = []
for ip in bulb_ips:
if target is not None and ip != target:
continue
bulbs.append(pywizlight.wizlight(ip))
if len(bulbs) == 0:
raise ValueError
except (FileNotFoundError, ValueError):
storage.write_file("lights.json", await scan_lights(access_token))
return bulbs
@router.get("/scan")
async def scan_lights(access_token: APIKey = Depends(get_api_key)) -> list[str]:
bulbs = await pywizlight.discovery.discover_lights()
bulb_ips = []
for bulb in bulbs:
bulb_ips.append(bulb.ip)
return bulb_ips
# make this rescan if light is not found on network
@router.get("/on")
async def lights_on(scene: str = None, target: str = None, access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token, target):
if scene is not None:
await light.turn_on(pywizlight.PilotBuilder(scene = get_id_from_scene_name(scene)))
else:
await light.turn_on()
@router.get("/off")
async def lights_off(target: str, access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token, target):
await light.turn_off()
@router.get("/scenes")
async def lights_scenes(access_token: APIKey = Depends(get_api_key)) -> None:
return SCENES

View file

@ -1,69 +0,0 @@
from __future__ import annotations
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import StreamingResponse
from pathlib import Path
from datetime import datetime
from zipfile import ZipFile
import json
from starlette.responses import FileResponse
router = APIRouter(prefix="/mc_optimizer")
ROOT_PATH = Path.home().joinpath(".radical_api/mc_optimizer")
ROOT_PATH.mkdir(exist_ok=True)
class PackFile:
def __init__(self, file: Path) -> None:
self.file = file
self.manifest = self.set_manifest()
self.datetime = datetime.strptime(self.manifest['build-date'], "%d-%m-%Y-%H%M%S")
self.build = self.manifest['build']
def set_manifest(self):
zf = ZipFile(self.file)
return json.loads(zf.read("manifest.json").decode("utf-8"))
def __lt__(self, other: PackFile):
return self.datetime.__lt__(other.datetime)
def iterfile(path: Path):
with path.open('rb') as fp:
yield from fp
@router.get("/download/latest")
def latest_pack():
files = []
for file in ROOT_PATH.iterdir():
files.append(PackFile(file))
files.sort()
if len(files) == 0:
raise HTTPException(404, "No files found.")
return FileResponse(files[-1].file, filename=f"mc-optimizer-pack-{files[-1].build}.zip")
@router.get("/download/build")
def get_build(build: int):
for file in ROOT_PATH.iterdir():
pf = PackFile(file)
if pf.build == build:
return FileResponse(pf.file, filename=f"mc-optimizer-pack-{pf.build}.zip")
raise HTTPException(404, "Build not found.")
@router.get("/manifest/latest")
def get_latest_manifest():
files = []
for file in ROOT_PATH.iterdir():
files.append(PackFile(file))
files.sort()
if len(files) == 0:
raise HTTPException(404, "No files found.")
return files[-1].manifest
@router.get("/manifest/builds")
def get_builds():
builds = []
for file in ROOT_PATH.iterdir():
pf = PackFile(file)
builds.append(pf.manifest)
return builds

View file

@ -3,14 +3,14 @@ from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, AP
from starlette.status import HTTP_403_FORBIDDEN
from pathlib import Path
import logging
import uuid
import json
from core.storage import Storage
router = APIRouter()
logger = logging.getLogger("security")
ROOT_PATH = Path.home().joinpath(".radical_api/keys")
storage = Storage("security")
API_KEY_NAME = "access_token"
@ -18,24 +18,17 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False)
system_key_path = ROOT_PATH.joinpath("system_key")
if not system_key_path.exists():
try:
system_key = storage.read_file("system_key.json")["system_key"]
except FileNotFoundError:
logger.info("System Key not found, generating.")
system_key_path.parent.mkdir(parents=True, exist_ok=True)
system_key_path.touch()
system_uuid = str(uuid.uuid4())
system_key_path.write_text(system_uuid)
storage.write_file("system_key.json", {"system_key": system_uuid})
logger.info(f"System UUID: {system_uuid}")
def load_keys():
path = ROOT_PATH.joinpath("api_keys.json")
if not path.exists():
path.touch()
path.write_text("{}")
with path.open("r") as f:
data = json.load(f)
return data
return storage.read_file("api_keys.json")
def get_system_key(
@ -43,12 +36,12 @@ def get_system_key(
api_key_header: str = Security(api_key_header),
api_key_cookie: str = Security(api_key_cookie)
):
path = ROOT_PATH.joinpath("system_key")
if api_key_query == path.read_text():
system_key
if api_key_query == system_key:
return api_key_query
elif api_key_header == path.read_text():
elif api_key_header == system_key:
return api_key_header
elif api_key_cookie == path.read_text():
elif api_key_cookie == system_key:
return api_key_cookie
else:
raise HTTPException(
@ -75,13 +68,11 @@ def get_api_key(
@router.get("/key/create")
def create_key(name, system_key: APIKey = Depends(get_system_key)):
keys: dict = load_keys()
keys: dict = storage.read_file("api_keys.json")
new_key = str(uuid.uuid4())
keys[new_key] = {}
keys[new_key]["name"] = name
path = ROOT_PATH.joinpath("api_keys.json")
with path.open("w") as f:
json.dump(keys, f, indent=4, sort_keys=True)
storage.write_file("api_keys.json", keys)
return new_key