diff --git a/lib/backup.py b/lib/backup.py deleted file mode 100644 index 842485c..0000000 --- a/lib/backup.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -Backup Module: - -Classes: -- BackupManager -""" -from datetime import datetime -from pathlib import Path -from zipfile import ZipFile - -from . import Storage - -__all__ = ["BackupManager"] - -class BackupManager: - """ - Facilitates backup creation. Stores all backups in ~/.storage/backups - - Methods: - - create_backup() - - get_backups() - - get_delete_candidates() - - delete_excess_backups() - """ - def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"), - date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None: - """ - Test - """ - self.target_path = target_path - self.backup_storage = backup_storage - self.date_format = date_format - self.separator = separator - - @property - def target_path(self) -> Path: - """ - Target to create backups of. This could be a file or folder. - """ - return self._target_path - - @target_path.setter - def target_path(self, new_path: Path): - if isinstance(new_path, Path): - if new_path.exists() and (new_path.is_dir() or new_path.is_file()): - self._target_path = new_path - else: - raise ValueError(new_path) - else: - raise TypeError(new_path) - - @property - def backup_storage(self) -> Storage: - """ - Storage object to store created backups in. - """ - return self._backup_storage - - @backup_storage.setter - def backup_storage(self, new_storage: Storage): - if isinstance(new_storage, Storage): - self._backup_storage = new_storage - else: - raise TypeError(new_storage) - - @property - def date_format(self) -> str: - """The datetime format string used to date the backups.""" - return self._date_format - - @date_format.setter - def date_format(self, new_format): - if isinstance(new_format, str): - self._date_format = new_format - else: - raise TypeError(new_format) - - @property - def separator(self) -> str: - """The separator that separates the archive name from the date""" - return self._separator - - @separator.setter - def separator(self, new_separator): - if isinstance(new_separator, str): - if new_separator not in self.target_path.name and new_separator not in self.date_format: - self._separator = new_separator - else: - raise ValueError(new_separator) - else: - raise TypeError(new_separator) - - def create_backup(self): - """ - Create a backup of the target path, stored in the backup storage folder. - """ - date_string = datetime.now().strftime(self.date_format) - backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip" - backup_path = self.backup_storage.get_file(backup_name) - with ZipFile(backup_path, mode="w") as zip_file: - for item in self.target_path.glob("**/*"): - zip_file.write(item, item.relative_to(self.target_path)) - - def get_backups(self): - """ - Get all backups found in the given folder. - """ - return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip") - - def get_delete_candidates(self, max_backup_count) -> list[Path]: - """ - Get all candidates for deletion with the given max_backup_count. - If none are available for deletion, returns None. - """ - def get_date(file: Path) -> datetime: - """ - Turns the datetime string in the file name into a datetime object. - """ - date_string = file.name.split(self.separator)[1].replace(file.suffix, "") - return datetime.strptime(date_string, self.date_format) - - backups = self.get_backups() - backups.sort(key=get_date) - return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups - - def delete_excess_backups(self, max_backup_count: int): - """Delete all excess backups""" - for file in self.get_delete_candidates(max_backup_count): - file.unlink() diff --git a/lib/storage.py b/lib/storage.py deleted file mode 100644 index 1aae1b3..0000000 --- a/lib/storage.py +++ /dev/null @@ -1,137 +0,0 @@ -""" -Storage Module: - -Classes: -- Storage -""" -from __future__ import annotations - -import json -import shutil - -from pathlib import Path - -__all__ = ['Storage'] - -class Storage: - """ - Unifies all filesystem access methods under a single class. - Minimizes room for mistakes and improves reusability. - - Methods: - - get_file() - - get_folder() - - write_file() - - read_file() - - delete_file() - - rename_file() - - add_file() - - list_files() - """ - - def __init__(self, folder: str, root_folder: str = ".storage") -> None: - """ - Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in. - - Args: - folder (str): the name of the folder to use. - """ - self.root = Path.home().joinpath(root_folder) - self.folder = self.root.joinpath(folder) - self.folder.mkdir(exist_ok=True, parents=True) - - def get_file(self, path: str) -> Path: - """Get the fully qualified path of a file in the folder. - - Args: - path (str): the name of the file to grab. - - Returns: - Path: the path of the file. - """ - return self.folder.joinpath(path) - - def get_folder(self, name: str) -> Storage: - """ - Get a folder inside the Storage directory. - Returns a Storage object representing that folder. - """ - return Storage(name, root_folder=self.folder.name) - - def write_file(self, name: str, data: dict): - """Write data to the given file in JSON format. - - Args: - name (str): the name of the file. - data (dict): the data to write. - """ - file = self.get_file(name) - file.touch(exist_ok=True) - with file.open("w+", encoding="utf-8") as f: - json.dump(data, f, indent=4) - - def read_file(self, name: str) -> dict: - """Read data from the given file in JSON format. - - Args: - name (str): the name of the file. - - Returns: - dict: the data from the file. - """ - file = self.get_file(name) - if not file.exists(): - return {} - with file.open("r+", encoding="utf-8") as f: - data = json.load(f) - return data - - def delete_file(self, name: str): - """Delete the given file from the folder. - - Args: - name (str): the name of the file. - """ - file = self.get_file(name) - file.unlink(missing_ok=True) - - def rename_file(self, old_name: str, new_name: str): - """Rename a file in the folder. - - Args: - old_name (str): the current name of the file. - new_name (str): the new name of the file. - """ - file = self.get_file(old_name) - new_file = self.folder.joinpath(new_name) - file.rename(new_file) - - def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None): - """Add a copy of a file to the folder. - If a binary stream is given, it is saved directly to the named location. - - Args: - name (str): The name to save it under. - path (Path): The path of the file to copy from. - binary (BinaryIO): The binary stream to copy from. - """ - if path is not None and binary is not None: - raise ValueError(binary, "Cannot supply both a path and a binary stream.") - elif path is not None: - shutil.copy(path, self.folder.joinpath(name)) - elif binary is not None: - with self.folder.joinpath(name).open("wb+") as f: - f.write(binary) - - def list_files(self, pattern: str | None = None) -> list[Path]: - """ - Return a list of all files in the directory. - """ - files: list[Path] = [] - if pattern is None: - for file in self.folder.iterdir(): - files.append(file) - else: - for file in self.folder.glob(pattern): - files.append(file) - return files diff --git a/xsh/aliases.xsh b/xsh/aliases.xsh index 9766ed7..743c585 100644 --- a/xsh/aliases.xsh +++ b/xsh/aliases.xsh @@ -1,4 +1,3 @@ -import lib source ~/xsh/functions.xsh def _debug(): @@ -42,60 +41,19 @@ def _bwc(args: list): else: print(f"copied {i}") -def _backup(args: list): - from pathlib import Path - target_path = Path(args[0]) - max_backup_count: int = 3 - if len(args) == 2 and isinstance(args[1], int): - max_backup_count = int(args[1]) - bm = lib.BackupManager(target_path) - bm.create_backup() - bm.delete_excess_backups(max_backup_count) - def _mcrcon(args: list): - import json - try: - with p"~/.mcrcon.json".open("r") as fp: - servers_data = json.load(fp) - server_data = servers_data[args[0]] - $MCRCON_HOST = server_data["address"] - $MCRCON_PORT = server_data["port"] - $MCRCON_PASS = server_data["password"] - $(mcrcon @(" ".join(args[1:]))) - except IndexError: - print("Please indicate which server to connect to.") - except KeyError: - print("Server configuration not found.") - except FileNotFoundError: - print("Server configuration file '.mcrcon.json' is not found.") + if p"~/.mcrcon".exists(): + source "~/.mcrcon" + else: + raise FileNotFoundError("~/.mcrcon") + $(mcrcon @(" ".join(args))) -def _mcterm(args: list): - import json - try: - with p"~/.mcrcon.json".open("r") as fp: - servers_data = json.load(fp) - server_data = servers_data[args[0]] - $MCRCON_HOST = server_data["address"] - $MCRCON_PORT = server_data["port"] - $MCRCON_PASS = server_data["password"] - mcrcon -t - except IndexError: - print("Please indicate which server to connect to.") - except KeyError: - print("Server configuration not found.") - except FileNotFoundError: - print("Server configuration file '.mcrcon.json' is not found.") - -def _mclist(): - import json - try: - with p"~/.mcrcon.json".open("r") as fp: - servers_data = json.load(fp) - print("Servers:") - for key in servers_data: - print("\t-", key) - except FileNotFoundError: - print("Server configuration file '.mcrcon.json' is not found.") +def _mcterm(): + if p"~/.mcrcon".exists(): + source "~/.mcrcon" + else: + raise FileNotFoundError("~/.mcrcon") + mcrcon -t def _alias(): for alias in aliases: @@ -133,15 +91,12 @@ def _ls(): def load_aliases(): aliases.update({ - 'bu': _backup, - 'backup': _backup, 'bwg': _bwc, 'colortest': _colortest, 'debug': _debug, 'ls': 'ls -alhs --color=auto', 'mc': _mcrcon, 'mct': _mcterm, - 'mcl': _mclist, ':q': 'exit', 'ensure-tmux': _ensure_tmux, 'aliases': _alias,