Improve storage management

- Unified all storage access to a single class
- Implemented new storage access to all routers in the API.
- Fixes #1
This commit is contained in:
Riley Housden 2022-03-12 23:50:33 -05:00
parent d001c10efd
commit 523baf7b57
Signed by: InValidFire
GPG key ID: 0D6208F6DF56B4D8
5 changed files with 98 additions and 71 deletions

View file

@ -1,20 +1,19 @@
import logging
from typing import Optional
import giteapy
import json
from pathlib import Path
from fastapi import APIRouter, Header, Query, HTTPException
ROOT_PATH = Path.home().joinpath(".radical_api/gitea")
from core.storage import Storage
storage = Storage("gitea")
logger = logging.getLogger("gitea")
router = APIRouter()
@router.get("/gitea/latest")
def get_latest_version(base_url: Optional[str] = Header(None), author: str = Query(None), repo: str = Query(None), pre_releases: bool = Query(False)):
with ROOT_PATH.joinpath("keys.json").open("r") as fp:
data: dict = json.load(fp)
data = storage.read_file("keys.json")
if base_url in data.keys():
config = giteapy.Configuration()
config.api_key['access_token'] = data[base_url]

View file

@ -1,44 +1,57 @@
import logging
from pathlib import Path
import pywizlight
from pywizlight.scenes import get_id_from_scene_name
from pywizlight.scenes import get_id_from_scene_name, SCENES
from fastapi import APIRouter, Depends
from fastapi.security.api_key import APIKey
from core.storage import Storage
from .security import get_api_key
logger = logging.getLogger("lights")
router = APIRouter(prefix="/lights", tags=["lights"])
# TODO: improve resource locating for API
ROOT_PATH = Path.home().joinpath(".radical_api/lights")
ROOT_PATH.mkdir(parents=True, exist_ok=True)
storage = Storage("lights")
# make this rescan if light is not found on network
@router.get("/scan")
async def get_lights(force: bool = False, access_token: APIKey = Depends(get_api_key)) -> list[pywizlight.wizlight]:
if not ROOT_PATH.joinpath("lights.json").exists() or force:
bulbs = await pywizlight.discovery.discover_lights()
with ROOT_PATH.joinpath("lights.json").open("w+") as f:
for bulb in bulbs:
f.write(f"{bulb.ip}\n")
else:
async def get_lights(access_token: APIKey = Depends(get_api_key), target: str = None) -> list[pywizlight.wizlight]:
try:
bulb_ips = storage.read_file("lights.json")
bulbs = []
for line in ROOT_PATH.joinpath("lights.json").read_text().split():
bulbs.append(pywizlight.wizlight(line))
for ip in bulb_ips:
if target is not None and ip != target:
continue
bulbs.append(pywizlight.wizlight(ip))
if len(bulbs) == 0:
raise ValueError
except (FileNotFoundError, ValueError):
storage.write_file("lights.json", await scan_lights(access_token))
return bulbs
@router.get("/scan")
async def scan_lights(access_token: APIKey = Depends(get_api_key)) -> list[str]:
bulbs = await pywizlight.discovery.discover_lights()
bulb_ips = []
for bulb in bulbs:
bulb_ips.append(bulb.ip)
return bulb_ips
# make this rescan if light is not found on network
@router.get("/on")
async def lights_on(scene: str = None, access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token):
async def lights_on(scene: str = None, target: str = None, access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token, target):
if scene is not None:
await light.turn_on(pywizlight.PilotBuilder(scene = get_id_from_scene_name(scene)))
else:
await light.turn_on()
@router.get("/off")
async def lights_off(access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token):
await light.turn_off()
async def lights_off(target: str, access_token: APIKey = Depends(get_api_key)) -> None:
for light in await get_lights(access_token, target):
await light.turn_off()
@router.get("/scenes")
async def lights_scenes(access_token: APIKey = Depends(get_api_key)) -> None:
return SCENES

View file

@ -3,14 +3,14 @@ from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, AP
from starlette.status import HTTP_403_FORBIDDEN
from pathlib import Path
import logging
import uuid
import json
from core.storage import Storage
router = APIRouter()
logger = logging.getLogger("security")
ROOT_PATH = Path.home().joinpath(".radical_api/keys")
storage = Storage("security")
API_KEY_NAME = "access_token"
@ -18,24 +18,17 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False)
system_key_path = ROOT_PATH.joinpath("system_key")
if not system_key_path.exists():
try:
system_key = storage.read_file("system_key.json")["system_key"]
except FileNotFoundError:
logger.info("System Key not found, generating.")
system_key_path.parent.mkdir(parents=True, exist_ok=True)
system_key_path.touch()
system_uuid = str(uuid.uuid4())
system_key_path.write_text(system_uuid)
storage.write_file("system_key.json", {"system_key": system_uuid})
logger.info(f"System UUID: {system_uuid}")
def load_keys():
path = ROOT_PATH.joinpath("api_keys.json")
if not path.exists():
path.touch()
path.write_text("{}")
with path.open("r") as f:
data = json.load(f)
return data
return storage.read_file("api_keys.json")
def get_system_key(
@ -43,12 +36,12 @@ def get_system_key(
api_key_header: str = Security(api_key_header),
api_key_cookie: str = Security(api_key_cookie)
):
path = ROOT_PATH.joinpath("system_key")
if api_key_query == path.read_text():
system_key
if api_key_query == system_key:
return api_key_query
elif api_key_header == path.read_text():
elif api_key_header == system_key:
return api_key_header
elif api_key_cookie == path.read_text():
elif api_key_cookie == system_key:
return api_key_cookie
else:
raise HTTPException(
@ -75,13 +68,11 @@ def get_api_key(
@router.get("/key/create")
def create_key(name, system_key: APIKey = Depends(get_system_key)):
keys: dict = load_keys()
keys: dict = storage.read_file("api_keys.json")
new_key = str(uuid.uuid4())
keys[new_key] = {}
keys[new_key]["name"] = name
path = ROOT_PATH.joinpath("api_keys.json")
with path.open("w") as f:
json.dump(keys, f, indent=4, sort_keys=True)
storage.write_file("api_keys.json", keys)
return new_key