API Security, Logging, & Restructure
- Added a .gitignore - Implemented security.py (apikeys.py from server-api) - Implemented wrapper.py (wrapper from radicalbot - Added proper logging using log.conf - Split tracker API endpoints into a router - Renamed Target classes to Tracker
This commit is contained in:
parent
13075f7fa3
commit
f5e0ac8cac
11 changed files with 278 additions and 111 deletions
0
routers/__init__.py
Normal file
0
routers/__init__.py
Normal file
91
routers/security.py
Normal file
91
routers/security.py
Normal file
|
@ -0,0 +1,91 @@
|
|||
from fastapi import Security, HTTPException, APIRouter, Depends
|
||||
from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, APIKey
|
||||
|
||||
from starlette.status import HTTP_403_FORBIDDEN
|
||||
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import uuid
|
||||
import json
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger("security")
|
||||
ROOT_PATH = Path.home().joinpath(".tracker_api/keys")
|
||||
|
||||
API_KEY_NAME = "access_token"
|
||||
|
||||
api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
|
||||
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
|
||||
api_key_cookie = APIKeyCookie(name=API_KEY_NAME, auto_error=False)
|
||||
|
||||
system_key_path = ROOT_PATH.joinpath("system_key")
|
||||
if not system_key_path.exists():
|
||||
logger.info("System Key not found, generating.")
|
||||
system_key_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
system_key_path.touch()
|
||||
system_uuid = str(uuid.uuid4())
|
||||
system_key_path.write_text(system_uuid)
|
||||
logger.info(f"System UUID: {system_uuid}")
|
||||
|
||||
|
||||
def load_keys():
|
||||
path = ROOT_PATH.joinpath("api_keys.json")
|
||||
if not path.exists():
|
||||
path.touch()
|
||||
path.write_text("{}")
|
||||
with path.open("r") as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
|
||||
|
||||
def get_system_key(
|
||||
api_key_query: str = Security(api_key_query),
|
||||
api_key_header: str = Security(api_key_header),
|
||||
api_key_cookie: str = Security(api_key_cookie)
|
||||
):
|
||||
path = ROOT_PATH.joinpath("system_key")
|
||||
if api_key_query == path.read_text():
|
||||
return api_key_query
|
||||
elif api_key_header == path.read_text():
|
||||
return api_key_header
|
||||
elif api_key_cookie == path.read_text():
|
||||
return api_key_cookie
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
|
||||
)
|
||||
|
||||
|
||||
def get_api_key(
|
||||
api_key_query: str = Security(api_key_query),
|
||||
api_key_header: str = Security(api_key_header),
|
||||
api_key_cookie: str = Security(api_key_cookie)
|
||||
):
|
||||
if api_key_query in load_keys():
|
||||
return api_key_query
|
||||
elif api_key_header in load_keys():
|
||||
return api_key_header
|
||||
elif api_key_cookie in load_keys():
|
||||
return api_key_header
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/key/create")
|
||||
def create_key(name, system_key: APIKey = Depends(get_system_key)):
|
||||
keys: dict = load_keys()
|
||||
new_key = str(uuid.uuid4())
|
||||
keys[new_key] = {}
|
||||
keys[new_key]["name"] = name
|
||||
path = ROOT_PATH.joinpath("api_keys.json")
|
||||
with path.open("w") as f:
|
||||
json.dump(keys, f, indent=4, sort_keys=True)
|
||||
return new_key
|
||||
|
||||
|
||||
@router.get("/key/me")
|
||||
def key_info(api_key: APIKey = Depends(get_api_key)):
|
||||
data = load_keys()
|
||||
return data[api_key]
|
56
routers/tracker.py
Normal file
56
routers/tracker.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi.security.api_key import APIKey
|
||||
|
||||
from .security import get_api_key
|
||||
from core.tracker import Tracker
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/tracker/points")
|
||||
def list_points(name: str, access_token: APIKey = Depends(get_api_key)):
|
||||
try:
|
||||
tracker = Tracker.from_data(name)
|
||||
return tracker.points
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
|
||||
@router.get("/tracker/points/add")
|
||||
def add_point(name: str, value: int, date_str: str = datetime.now().isoformat(),
|
||||
access_token: APIKey = Depends(get_api_key)):
|
||||
try:
|
||||
tracker = Tracker.from_data(name)
|
||||
except FileNotFoundError:
|
||||
tracker = Tracker(name, [])
|
||||
tracker.add_point(date_str, value)
|
||||
return tracker.to_json()
|
||||
|
||||
|
||||
@router.put("/tracker/points/modify")
|
||||
def modify_point(name: str, date_str: str, value: int, access_token: APIKey = Depends(get_api_key)):
|
||||
tracker = Tracker.from_data(name)
|
||||
tracker.modify_point(date_str, value)
|
||||
return 200
|
||||
|
||||
|
||||
@router.delete("/tracker/points/delete")
|
||||
def delete_point(name: str, date_str: str, access_token: APIKey = Depends(get_api_key)):
|
||||
tracker = Tracker.from_data(name)
|
||||
tracker.delete_point(date_str)
|
||||
return 200
|
||||
|
||||
|
||||
@router.get("/tracker/rename")
|
||||
def rename_tracker(name: str, new_name: str, access_token: APIKey = Depends(get_api_key)):
|
||||
tracker = Tracker.from_data(name)
|
||||
tracker.rename(new_name)
|
||||
return 200
|
||||
|
||||
|
||||
@router.delete("/tracker/delete")
|
||||
def delete_tracker(name: str, access_token: APIKey = Depends(get_api_key)):
|
||||
tracker = Tracker.from_data(name)
|
||||
tracker.delete()
|
Loading…
Add table
Add a link
Reference in a new issue