Compare commits
4 commits
6cdbdefd54
...
523baf7b57
Author | SHA1 | Date | |
---|---|---|---|
523baf7b57 | |||
d001c10efd | |||
7c89b692df | |||
6ebfc7d3fb |
7 changed files with 126 additions and 120 deletions
4
api.py
4
api.py
|
@ -1,5 +1,6 @@
|
|||
import importlib
|
||||
import logging
|
||||
import platform
|
||||
|
||||
from pathlib import Path
|
||||
from fastapi import FastAPI
|
||||
|
@ -19,7 +20,10 @@ def load_module(name: str):
|
|||
for m_path in Path("routers").glob("*"):
|
||||
if "__pycache__" in m_path.parts or "__init__" == m_path.stem:
|
||||
continue
|
||||
if platform.system() != "Windows":
|
||||
load_module(str(m_path).replace("/", ".").replace(".py", ""))
|
||||
else:
|
||||
load_module(str(m_path).replace("\\", ".").replace(".py", ""))
|
||||
|
||||
|
||||
@api.get("/")
|
||||
|
|
35
core/storage.py
Normal file
35
core/storage.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import json
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
class Storage:
|
||||
"""Handles all filesystem interactions for the API"""
|
||||
def __init__(self, router: str) -> None:
|
||||
self.root = Path.home().joinpath(".radical_api").joinpath(router)
|
||||
self.root.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
def _get_file(self, path: str) -> Path:
|
||||
return self.root.joinpath(path)
|
||||
|
||||
def write_file(self, name: str, data: dict):
|
||||
file = self._get_file(name)
|
||||
file.touch(exist_ok=True)
|
||||
with file.open("w+") as f:
|
||||
json.dump(data, f, indent=4, sort_keys=True)
|
||||
|
||||
def read_file(self, name: str) -> dict:
|
||||
file = self._get_file(name)
|
||||
if not file.exists():
|
||||
return {}
|
||||
with file.open("r+") as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
|
||||
def delete_file(self, name: str):
|
||||
file = self._get_file(name)
|
||||
file.unlink(missing_ok=True)
|
||||
|
||||
def rename_file(self, old_name: str, new_name: str):
|
||||
file = self._get_file(old_name)
|
||||
new_file = self.root.joinpath(new_name)
|
||||
file.rename(new_file)
|
|
@ -1,10 +1,10 @@
|
|||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from pathlib import Path
|
||||
|
||||
DATA_DIR = Path.home().joinpath(".radical-api/trackers")
|
||||
from core.storage import Storage
|
||||
|
||||
storage = Storage("trackers")
|
||||
|
||||
def object_hook(dct):
|
||||
if "datetime" in dct:
|
||||
|
@ -28,24 +28,13 @@ class Tracker:
|
|||
|
||||
def rename(self, name):
|
||||
"""Rename the Tracker."""
|
||||
DATA_DIR.joinpath(f"{self.name}.json").rename(
|
||||
DATA_DIR.joinpath(f"{name}.json"))
|
||||
storage.rename_file(f"{self.name}.json", f"{name}.json")
|
||||
self.name = name
|
||||
self.save()
|
||||
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
|
||||
|
||||
def delete(self):
|
||||
"""Delete the Tracker."""
|
||||
filepath = DATA_DIR.joinpath(f"{self.name}.json")
|
||||
filepath.unlink()
|
||||
|
||||
def save(self):
|
||||
"""Save the tracker to JSON."""
|
||||
DATA_DIR.mkdir(exist_ok=True)
|
||||
filepath = DATA_DIR.joinpath(f"{self.name}.json")
|
||||
filepath.touch(exist_ok=True)
|
||||
with filepath.open("w+") as fp:
|
||||
data = jsonable_encoder(self)
|
||||
json.dump(data, fp, indent=4)
|
||||
storage.delete_file(f"{self.name}.json")
|
||||
|
||||
def modify_point(self, date_str: str, value: int):
|
||||
"""Modify a point. Change its assigned value to the one given."""
|
||||
|
@ -54,13 +43,13 @@ class Tracker:
|
|||
if point.datetime == date_time:
|
||||
point.value = value
|
||||
break
|
||||
self.save()
|
||||
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
|
||||
|
||||
def add_point(self, date_str: str, value: int):
|
||||
"""Add a point to the tracker."""
|
||||
point = TrackerPoint(date_str, value)
|
||||
self.points.append(point)
|
||||
self.save()
|
||||
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
|
||||
|
||||
def delete_point(self, date_str: str):
|
||||
"""Remove a point from the tracker."""
|
||||
|
@ -69,7 +58,7 @@ class Tracker:
|
|||
if point.datetime == date_time:
|
||||
self.points.remove(point)
|
||||
break
|
||||
self.save()
|
||||
storage.write_file(f"{self.name}.json", jsonable_encoder(self))
|
||||
|
||||
def list_points(self, start_date: datetime = None, end_date: datetime = None) -> list:
|
||||
"""
|
||||
|
@ -100,10 +89,10 @@ class Tracker:
|
|||
@classmethod
|
||||
def from_data(cls, name):
|
||||
"""Load a tracker from the DATA_DIR."""
|
||||
filepath = DATA_DIR.joinpath(f"{name}.json")
|
||||
return cls.from_json(filepath.read_text())
|
||||
data = storage.read_file(f"{name}.json")
|
||||
return cls.from_json(data)
|
||||
|
||||
@staticmethod
|
||||
def from_json(json_str):
|
||||
"""Load a tracker from a JSON string."""
|
||||
return json.loads(json_str, object_hook=object_hook)
|
||||
return json.load(json_str, object_hook=object_hook)
|
||||
|
|
|
@ -1,20 +1,19 @@
|
|||
import logging
|
||||
from typing import Optional
|
||||
import giteapy
|
||||
import json
|
||||
|
||||
from pathlib import Path
|
||||
from fastapi import APIRouter, Header, Query, HTTPException
|
||||
|
||||
ROOT_PATH = Path.home().joinpath(".radical_api/gitea")
|
||||
from core.storage import Storage
|
||||
|
||||
storage = Storage("gitea")
|
||||
|
||||
logger = logging.getLogger("gitea")
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/gitea/latest")
|
||||
def get_latest_version(base_url: Optional[str] = Header(None), author: str = Query(None), repo: str = Query(None), pre_releases: bool = Query(False)):
|
||||
with ROOT_PATH.joinpath("keys.json").open("r") as fp:
|
||||
data: dict = json.load(fp)
|
||||
data = storage.read_file("keys.json")
|
||||
if base_url in data.keys():
|
||||
config = giteapy.Configuration()
|
||||
config.api_key['access_token'] = data[base_url]
|
||||
|
|
57
routers/lights.py
Normal file
57
routers/lights.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
import logging
|
||||
|
||||
import pywizlight
|
||||
from pywizlight.scenes import get_id_from_scene_name, SCENES
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi.security.api_key import APIKey
|
||||
|
||||
from core.storage import Storage
|
||||
|
||||
from .security import get_api_key
|
||||
|
||||
logger = logging.getLogger("lights")
|
||||
router = APIRouter(prefix="/lights", tags=["lights"])
|
||||
|
||||
storage = Storage("lights")
|
||||
|
||||
async def get_lights(access_token: APIKey = Depends(get_api_key), target: str = None) -> list[pywizlight.wizlight]:
|
||||
try:
|
||||
bulb_ips = storage.read_file("lights.json")
|
||||
bulbs = []
|
||||
for ip in bulb_ips:
|
||||
if target is not None and ip != target:
|
||||
continue
|
||||
bulbs.append(pywizlight.wizlight(ip))
|
||||
if len(bulbs) == 0:
|
||||
raise ValueError
|
||||
except (FileNotFoundError, ValueError):
|
||||
storage.write_file("lights.json", await scan_lights(access_token))
|
||||
return bulbs
|
||||
|
||||
@router.get("/scan")
|
||||
async def scan_lights(access_token: APIKey = Depends(get_api_key)) -> list[str]:
|
||||
bulbs = await pywizlight.discovery.discover_lights()
|
||||
bulb_ips = []
|
||||
for bulb in bulbs:
|
||||
bulb_ips.append(bulb.ip)
|
||||
return bulb_ips
|
||||
|
||||
# make this rescan if light is not found on network
|
||||
|
||||
@router.get("/on")
|
||||
async def lights_on(scene: str = None, target: str = None, access_token: APIKey = Depends(get_api_key)) -> None:
|
||||
for light in await get_lights(access_token, target):
|
||||
if scene is not None:
|
||||
await light.turn_on(pywizlight.PilotBuilder(scene = get_id_from_scene_name(scene)))
|
||||
else:
|
||||
await light.turn_on()
|
||||
|
||||
@router.get("/off")
|
||||
async def lights_off(target: str, access_token: APIKey = Depends(get_api_key)) -> None:
|
||||
for light in await get_lights(access_token, target):
|
||||
await light.turn_off()
|
||||
|
||||
@router.get("/scenes")
|
||||
async def lights_scenes(access_token: APIKey = Depends(get_api_key)) -> None:
|
||||
return SCENES
|
|
@ -1,69 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from zipfile import ZipFile
|
||||
import json
|
||||
|
||||
from starlette.responses import FileResponse
|
||||
|
||||
|
||||
router = APIRouter(prefix="/mc_optimizer")
|
||||
ROOT_PATH = Path.home().joinpath(".radical_api/mc_optimizer")
|
||||
ROOT_PATH.mkdir(exist_ok=True)
|
||||
|
||||
class PackFile:
|
||||
def __init__(self, file: Path) -> None:
|
||||
self.file = file
|
||||
self.manifest = self.set_manifest()
|
||||
self.datetime = datetime.strptime(self.manifest['build-date'], "%d-%m-%Y-%H%M%S")
|
||||
self.build = self.manifest['build']
|
||||
|
||||
def set_manifest(self):
|
||||
zf = ZipFile(self.file)
|
||||
return json.loads(zf.read("manifest.json").decode("utf-8"))
|
||||
|
||||
def __lt__(self, other: PackFile):
|
||||
return self.datetime.__lt__(other.datetime)
|
||||
|
||||
def iterfile(path: Path):
|
||||
with path.open('rb') as fp:
|
||||
yield from fp
|
||||
|
||||
@router.get("/download/latest")
|
||||
def latest_pack():
|
||||
files = []
|
||||
for file in ROOT_PATH.iterdir():
|
||||
files.append(PackFile(file))
|
||||
files.sort()
|
||||
if len(files) == 0:
|
||||
raise HTTPException(404, "No files found.")
|
||||
return FileResponse(files[-1].file, filename=f"mc-optimizer-pack-{files[-1].build}.zip")
|
||||
|
||||
@router.get("/download/build")
|
||||
def get_build(build: int):
|
||||
for file in ROOT_PATH.iterdir():
|
||||
pf = PackFile(file)
|
||||
if pf.build == build:
|
||||
return FileResponse(pf.file, filename=f"mc-optimizer-pack-{pf.build}.zip")
|
||||
raise HTTPException(404, "Build not found.")
|
||||
|
||||
@router.get("/manifest/latest")
|
||||
def get_latest_manifest():
|
||||
files = []
|
||||
for file in ROOT_PATH.iterdir():
|
||||
files.append(PackFile(file))
|
||||
files.sort()
|
||||
if len(files) == 0:
|
||||
raise HTTPException(404, "No files found.")
|
||||
return files[-1].manifest
|
||||
|
||||
@router.get("/manifest/builds")
|
||||
def get_builds():
|
||||
builds = []
|
||||
for file in ROOT_PATH.iterdir():
|
||||
pf = PackFile(file)
|
||||
builds.append(pf.manifest)
|
||||
return builds
|
|
@ -3,14 +3,14 @@ from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, AP
|
|||
|
||||
from starlette.status import HTTP_403_FORBIDDEN
|
||||
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from core.storage import Storage
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger("security")
|
||||
ROOT_PATH = Path.home().joinpath(".radical_api/keys")
|
||||
storage = Storage("security")
|
||||
|
||||
API_KEY_NAME = "access_token"
|
||||
|
||||
|
@ -18,24 +18,17 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
|
|||
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
|
||||
api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False)
|
||||
|
||||
system_key_path = ROOT_PATH.joinpath("system_key")
|
||||
if not system_key_path.exists():
|
||||
try:
|
||||
system_key = storage.read_file("system_key.json")["system_key"]
|
||||
except FileNotFoundError:
|
||||
logger.info("System Key not found, generating.")
|
||||
system_key_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
system_key_path.touch()
|
||||
system_uuid = str(uuid.uuid4())
|
||||
system_key_path.write_text(system_uuid)
|
||||
storage.write_file("system_key.json", {"system_key": system_uuid})
|
||||
logger.info(f"System UUID: {system_uuid}")
|
||||
|
||||
|
||||
def load_keys():
|
||||
path = ROOT_PATH.joinpath("api_keys.json")
|
||||
if not path.exists():
|
||||
path.touch()
|
||||
path.write_text("{}")
|
||||
with path.open("r") as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
return storage.read_file("api_keys.json")
|
||||
|
||||
|
||||
def get_system_key(
|
||||
|
@ -43,12 +36,12 @@ def get_system_key(
|
|||
api_key_header: str = Security(api_key_header),
|
||||
api_key_cookie: str = Security(api_key_cookie)
|
||||
):
|
||||
path = ROOT_PATH.joinpath("system_key")
|
||||
if api_key_query == path.read_text():
|
||||
system_key
|
||||
if api_key_query == system_key:
|
||||
return api_key_query
|
||||
elif api_key_header == path.read_text():
|
||||
elif api_key_header == system_key:
|
||||
return api_key_header
|
||||
elif api_key_cookie == path.read_text():
|
||||
elif api_key_cookie == system_key:
|
||||
return api_key_cookie
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
@ -75,13 +68,11 @@ def get_api_key(
|
|||
|
||||
@router.get("/key/create")
|
||||
def create_key(name, system_key: APIKey = Depends(get_system_key)):
|
||||
keys: dict = load_keys()
|
||||
keys: dict = storage.read_file("api_keys.json")
|
||||
new_key = str(uuid.uuid4())
|
||||
keys[new_key] = {}
|
||||
keys[new_key]["name"] = name
|
||||
path = ROOT_PATH.joinpath("api_keys.json")
|
||||
with path.open("w") as f:
|
||||
json.dump(keys, f, indent=4, sort_keys=True)
|
||||
storage.write_file("api_keys.json", keys)
|
||||
return new_key
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue