From 067ab48de09652a3971de38c81281e7b615fe8e7 Mon Sep 17 00:00:00 2001 From: Riley Housden Date: Mon, 4 Jul 2022 01:36:36 -0400 Subject: [PATCH] Add lib Adds backup and storage libraries to ease backing up of files on command. --- lib/backup.py | 129 ++++++++++++++++++++++++++++++++++++++++++++++ lib/storage.py | 137 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 lib/backup.py create mode 100644 lib/storage.py diff --git a/lib/backup.py b/lib/backup.py new file mode 100644 index 0000000..842485c --- /dev/null +++ b/lib/backup.py @@ -0,0 +1,129 @@ +""" +Backup Module: + +Classes: +- BackupManager +""" +from datetime import datetime +from pathlib import Path +from zipfile import ZipFile + +from . import Storage + +__all__ = ["BackupManager"] + +class BackupManager: + """ + Facilitates backup creation. Stores all backups in ~/.storage/backups + + Methods: + - create_backup() + - get_backups() + - get_delete_candidates() + - delete_excess_backups() + """ + def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"), + date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None: + """ + Test + """ + self.target_path = target_path + self.backup_storage = backup_storage + self.date_format = date_format + self.separator = separator + + @property + def target_path(self) -> Path: + """ + Target to create backups of. This could be a file or folder. + """ + return self._target_path + + @target_path.setter + def target_path(self, new_path: Path): + if isinstance(new_path, Path): + if new_path.exists() and (new_path.is_dir() or new_path.is_file()): + self._target_path = new_path + else: + raise ValueError(new_path) + else: + raise TypeError(new_path) + + @property + def backup_storage(self) -> Storage: + """ + Storage object to store created backups in. + """ + return self._backup_storage + + @backup_storage.setter + def backup_storage(self, new_storage: Storage): + if isinstance(new_storage, Storage): + self._backup_storage = new_storage + else: + raise TypeError(new_storage) + + @property + def date_format(self) -> str: + """The datetime format string used to date the backups.""" + return self._date_format + + @date_format.setter + def date_format(self, new_format): + if isinstance(new_format, str): + self._date_format = new_format + else: + raise TypeError(new_format) + + @property + def separator(self) -> str: + """The separator that separates the archive name from the date""" + return self._separator + + @separator.setter + def separator(self, new_separator): + if isinstance(new_separator, str): + if new_separator not in self.target_path.name and new_separator not in self.date_format: + self._separator = new_separator + else: + raise ValueError(new_separator) + else: + raise TypeError(new_separator) + + def create_backup(self): + """ + Create a backup of the target path, stored in the backup storage folder. + """ + date_string = datetime.now().strftime(self.date_format) + backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip" + backup_path = self.backup_storage.get_file(backup_name) + with ZipFile(backup_path, mode="w") as zip_file: + for item in self.target_path.glob("**/*"): + zip_file.write(item, item.relative_to(self.target_path)) + + def get_backups(self): + """ + Get all backups found in the given folder. + """ + return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip") + + def get_delete_candidates(self, max_backup_count) -> list[Path]: + """ + Get all candidates for deletion with the given max_backup_count. + If none are available for deletion, returns None. + """ + def get_date(file: Path) -> datetime: + """ + Turns the datetime string in the file name into a datetime object. + """ + date_string = file.name.split(self.separator)[1].replace(file.suffix, "") + return datetime.strptime(date_string, self.date_format) + + backups = self.get_backups() + backups.sort(key=get_date) + return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups + + def delete_excess_backups(self, max_backup_count: int): + """Delete all excess backups""" + for file in self.get_delete_candidates(max_backup_count): + file.unlink() diff --git a/lib/storage.py b/lib/storage.py new file mode 100644 index 0000000..1aae1b3 --- /dev/null +++ b/lib/storage.py @@ -0,0 +1,137 @@ +""" +Storage Module: + +Classes: +- Storage +""" +from __future__ import annotations + +import json +import shutil + +from pathlib import Path + +__all__ = ['Storage'] + +class Storage: + """ + Unifies all filesystem access methods under a single class. + Minimizes room for mistakes and improves reusability. + + Methods: + - get_file() + - get_folder() + - write_file() + - read_file() + - delete_file() + - rename_file() + - add_file() + - list_files() + """ + + def __init__(self, folder: str, root_folder: str = ".storage") -> None: + """ + Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in. + + Args: + folder (str): the name of the folder to use. + """ + self.root = Path.home().joinpath(root_folder) + self.folder = self.root.joinpath(folder) + self.folder.mkdir(exist_ok=True, parents=True) + + def get_file(self, path: str) -> Path: + """Get the fully qualified path of a file in the folder. + + Args: + path (str): the name of the file to grab. + + Returns: + Path: the path of the file. + """ + return self.folder.joinpath(path) + + def get_folder(self, name: str) -> Storage: + """ + Get a folder inside the Storage directory. + Returns a Storage object representing that folder. + """ + return Storage(name, root_folder=self.folder.name) + + def write_file(self, name: str, data: dict): + """Write data to the given file in JSON format. + + Args: + name (str): the name of the file. + data (dict): the data to write. + """ + file = self.get_file(name) + file.touch(exist_ok=True) + with file.open("w+", encoding="utf-8") as f: + json.dump(data, f, indent=4) + + def read_file(self, name: str) -> dict: + """Read data from the given file in JSON format. + + Args: + name (str): the name of the file. + + Returns: + dict: the data from the file. + """ + file = self.get_file(name) + if not file.exists(): + return {} + with file.open("r+", encoding="utf-8") as f: + data = json.load(f) + return data + + def delete_file(self, name: str): + """Delete the given file from the folder. + + Args: + name (str): the name of the file. + """ + file = self.get_file(name) + file.unlink(missing_ok=True) + + def rename_file(self, old_name: str, new_name: str): + """Rename a file in the folder. + + Args: + old_name (str): the current name of the file. + new_name (str): the new name of the file. + """ + file = self.get_file(old_name) + new_file = self.folder.joinpath(new_name) + file.rename(new_file) + + def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None): + """Add a copy of a file to the folder. + If a binary stream is given, it is saved directly to the named location. + + Args: + name (str): The name to save it under. + path (Path): The path of the file to copy from. + binary (BinaryIO): The binary stream to copy from. + """ + if path is not None and binary is not None: + raise ValueError(binary, "Cannot supply both a path and a binary stream.") + elif path is not None: + shutil.copy(path, self.folder.joinpath(name)) + elif binary is not None: + with self.folder.joinpath(name).open("wb+") as f: + f.write(binary) + + def list_files(self, pattern: str | None = None) -> list[Path]: + """ + Return a list of all files in the directory. + """ + files: list[Path] = [] + if pattern is None: + for file in self.folder.iterdir(): + files.append(file) + else: + for file in self.folder.glob(pattern): + files.append(file) + return files