import time from datetime import datetime, timedelta from urllib.parse import urljoin import requests as RQ from utils import timed_cache class Radarr(object): def __init__(self, url, api_key): self.url = url self.api_key = api_key self.root_folder = self.get("api/v3/rootFolder")[0]["path"] self.quality_profile = self.get("api/v3/qualityprofile")[0] def get(self, url, **kwargs): kwargs["apikey"] = self.api_key kwargs["_"] = str(int(time.time())) res = RQ.get(urljoin(self.url, url), params=kwargs) res.raise_for_status() try: return res.json() except Exception: return res.text def search(self, query): return self.get("api/v3/movie/lookup", term=query) def status(self): return self.get("api/v3/system/status") @timed_cache() def history(self, pageSize=500): return self.get( "api/v3/history", page=1, pageSize=500, sortDirection="descending", sortKey="date", ) @timed_cache() def calendar(self, days=90): today = datetime.today() start = today - timedelta(days=days) end = today + timedelta(days=days) return self.get( "api/v3/calendar", unmonitored=False, start=start.isoformat(), end=end.isoformat(), ) @timed_cache() def movies(self, movie_id=None): if movie_id is None: return self.get("api/v3/movie") return self.get("api/v3/movie/{}".format(movie_id)) @timed_cache(seconds=60) def queue(self, **kwargs): data = [] page = 1 while True: res = self.get("api/v3/queue", page=page, pageSize=100, **kwargs) data += res.get("records", []) page += 1 if len(data) >= res.get("totalRecords", 0): break return data def add(self, data): data["qualityProfileId"] = self.quality_profile["id"] data["minimumAvailability"] = 2 # InCinema data["rootFolderPath"] = self.root_folder data["addOptions"] = {"searchForMovie": True} params = dict(apikey=self.api_key) res = RQ.post( urljoin( self.url, "api/v3/movie"), json=data, params=params) return res.json() def log(self, limit=0): return self.get( "api/v3/log", page=1, pageSize=(limit or 1024), sortDirection="descending", sortKey="time", ) def test(self): self.status() return {}