Adds backup and storage libraries to ease backing up of files on command.
This commit is contained in:
Riley Housden 2022-07-04 01:36:36 -04:00
parent eedecb34d5
commit 067ab48de0
Signed by: InValidFire
GPG Key ID: 0D6208F6DF56B4D8
2 changed files with 266 additions and 0 deletions

129
lib/backup.py Normal file
View File

@ -0,0 +1,129 @@
"""
Backup Module:
Classes:
- BackupManager
"""
from datetime import datetime
from pathlib import Path
from zipfile import ZipFile
from . import Storage
__all__ = ["BackupManager"]
class BackupManager:
"""
Facilitates backup creation. Stores all backups in ~/.storage/backups
Methods:
- create_backup()
- get_backups()
- get_delete_candidates()
- delete_excess_backups()
"""
def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"),
date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None:
"""
Test
"""
self.target_path = target_path
self.backup_storage = backup_storage
self.date_format = date_format
self.separator = separator
@property
def target_path(self) -> Path:
"""
Target to create backups of. This could be a file or folder.
"""
return self._target_path
@target_path.setter
def target_path(self, new_path: Path):
if isinstance(new_path, Path):
if new_path.exists() and (new_path.is_dir() or new_path.is_file()):
self._target_path = new_path
else:
raise ValueError(new_path)
else:
raise TypeError(new_path)
@property
def backup_storage(self) -> Storage:
"""
Storage object to store created backups in.
"""
return self._backup_storage
@backup_storage.setter
def backup_storage(self, new_storage: Storage):
if isinstance(new_storage, Storage):
self._backup_storage = new_storage
else:
raise TypeError(new_storage)
@property
def date_format(self) -> str:
"""The datetime format string used to date the backups."""
return self._date_format
@date_format.setter
def date_format(self, new_format):
if isinstance(new_format, str):
self._date_format = new_format
else:
raise TypeError(new_format)
@property
def separator(self) -> str:
"""The separator that separates the archive name from the date"""
return self._separator
@separator.setter
def separator(self, new_separator):
if isinstance(new_separator, str):
if new_separator not in self.target_path.name and new_separator not in self.date_format:
self._separator = new_separator
else:
raise ValueError(new_separator)
else:
raise TypeError(new_separator)
def create_backup(self):
"""
Create a backup of the target path, stored in the backup storage folder.
"""
date_string = datetime.now().strftime(self.date_format)
backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip"
backup_path = self.backup_storage.get_file(backup_name)
with ZipFile(backup_path, mode="w") as zip_file:
for item in self.target_path.glob("**/*"):
zip_file.write(item, item.relative_to(self.target_path))
def get_backups(self):
"""
Get all backups found in the given folder.
"""
return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip")
def get_delete_candidates(self, max_backup_count) -> list[Path]:
"""
Get all candidates for deletion with the given max_backup_count.
If none are available for deletion, returns None.
"""
def get_date(file: Path) -> datetime:
"""
Turns the datetime string in the file name into a datetime object.
"""
date_string = file.name.split(self.separator)[1].replace(file.suffix, "")
return datetime.strptime(date_string, self.date_format)
backups = self.get_backups()
backups.sort(key=get_date)
return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups
def delete_excess_backups(self, max_backup_count: int):
"""Delete all excess backups"""
for file in self.get_delete_candidates(max_backup_count):
file.unlink()

137
lib/storage.py Normal file
View File

@ -0,0 +1,137 @@
"""
Storage Module:
Classes:
- Storage
"""
from __future__ import annotations
import json
import shutil
from pathlib import Path
__all__ = ['Storage']
class Storage:
"""
Unifies all filesystem access methods under a single class.
Minimizes room for mistakes and improves reusability.
Methods:
- get_file()
- get_folder()
- write_file()
- read_file()
- delete_file()
- rename_file()
- add_file()
- list_files()
"""
def __init__(self, folder: str, root_folder: str = ".storage") -> None:
"""
Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in.
Args:
folder (str): the name of the folder to use.
"""
self.root = Path.home().joinpath(root_folder)
self.folder = self.root.joinpath(folder)
self.folder.mkdir(exist_ok=True, parents=True)
def get_file(self, path: str) -> Path:
"""Get the fully qualified path of a file in the folder.
Args:
path (str): the name of the file to grab.
Returns:
Path: the path of the file.
"""
return self.folder.joinpath(path)
def get_folder(self, name: str) -> Storage:
"""
Get a folder inside the Storage directory.
Returns a Storage object representing that folder.
"""
return Storage(name, root_folder=self.folder.name)
def write_file(self, name: str, data: dict):
"""Write data to the given file in JSON format.
Args:
name (str): the name of the file.
data (dict): the data to write.
"""
file = self.get_file(name)
file.touch(exist_ok=True)
with file.open("w+", encoding="utf-8") as f:
json.dump(data, f, indent=4)
def read_file(self, name: str) -> dict:
"""Read data from the given file in JSON format.
Args:
name (str): the name of the file.
Returns:
dict: the data from the file.
"""
file = self.get_file(name)
if not file.exists():
return {}
with file.open("r+", encoding="utf-8") as f:
data = json.load(f)
return data
def delete_file(self, name: str):
"""Delete the given file from the folder.
Args:
name (str): the name of the file.
"""
file = self.get_file(name)
file.unlink(missing_ok=True)
def rename_file(self, old_name: str, new_name: str):
"""Rename a file in the folder.
Args:
old_name (str): the current name of the file.
new_name (str): the new name of the file.
"""
file = self.get_file(old_name)
new_file = self.folder.joinpath(new_name)
file.rename(new_file)
def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None):
"""Add a copy of a file to the folder.
If a binary stream is given, it is saved directly to the named location.
Args:
name (str): The name to save it under.
path (Path): The path of the file to copy from.
binary (BinaryIO): The binary stream to copy from.
"""
if path is not None and binary is not None:
raise ValueError(binary, "Cannot supply both a path and a binary stream.")
elif path is not None:
shutil.copy(path, self.folder.joinpath(name))
elif binary is not None:
with self.folder.joinpath(name).open("wb+") as f:
f.write(binary)
def list_files(self, pattern: str | None = None) -> list[Path]:
"""
Return a list of all files in the directory.
"""
files: list[Path] = []
if pattern is None:
for file in self.folder.iterdir():
files.append(file)
else:
for file in self.folder.glob(pattern):
files.append(file)
return files