Compare commits

...

2 commits

Author SHA1 Message Date
44555fc84e
add basic backup alias 2022-07-04 01:38:57 -04:00
067ab48de0
Add lib
Adds backup and storage libraries to ease backing up of files on command.
2022-07-04 01:36:36 -04:00
3 changed files with 323 additions and 12 deletions

129
lib/backup.py Normal file
View file

@ -0,0 +1,129 @@
"""
Backup Module:
Classes:
- BackupManager
"""
from datetime import datetime
from pathlib import Path
from zipfile import ZipFile
from . import Storage
__all__ = ["BackupManager"]
class BackupManager:
"""
Facilitates backup creation. Stores all backups in ~/.storage/backups
Methods:
- create_backup()
- get_backups()
- get_delete_candidates()
- delete_excess_backups()
"""
def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"),
date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None:
"""
Test
"""
self.target_path = target_path
self.backup_storage = backup_storage
self.date_format = date_format
self.separator = separator
@property
def target_path(self) -> Path:
"""
Target to create backups of. This could be a file or folder.
"""
return self._target_path
@target_path.setter
def target_path(self, new_path: Path):
if isinstance(new_path, Path):
if new_path.exists() and (new_path.is_dir() or new_path.is_file()):
self._target_path = new_path
else:
raise ValueError(new_path)
else:
raise TypeError(new_path)
@property
def backup_storage(self) -> Storage:
"""
Storage object to store created backups in.
"""
return self._backup_storage
@backup_storage.setter
def backup_storage(self, new_storage: Storage):
if isinstance(new_storage, Storage):
self._backup_storage = new_storage
else:
raise TypeError(new_storage)
@property
def date_format(self) -> str:
"""The datetime format string used to date the backups."""
return self._date_format
@date_format.setter
def date_format(self, new_format):
if isinstance(new_format, str):
self._date_format = new_format
else:
raise TypeError(new_format)
@property
def separator(self) -> str:
"""The separator that separates the archive name from the date"""
return self._separator
@separator.setter
def separator(self, new_separator):
if isinstance(new_separator, str):
if new_separator not in self.target_path.name and new_separator not in self.date_format:
self._separator = new_separator
else:
raise ValueError(new_separator)
else:
raise TypeError(new_separator)
def create_backup(self):
"""
Create a backup of the target path, stored in the backup storage folder.
"""
date_string = datetime.now().strftime(self.date_format)
backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip"
backup_path = self.backup_storage.get_file(backup_name)
with ZipFile(backup_path, mode="w") as zip_file:
for item in self.target_path.glob("**/*"):
zip_file.write(item, item.relative_to(self.target_path))
def get_backups(self):
"""
Get all backups found in the given folder.
"""
return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip")
def get_delete_candidates(self, max_backup_count) -> list[Path]:
"""
Get all candidates for deletion with the given max_backup_count.
If none are available for deletion, returns None.
"""
def get_date(file: Path) -> datetime:
"""
Turns the datetime string in the file name into a datetime object.
"""
date_string = file.name.split(self.separator)[1].replace(file.suffix, "")
return datetime.strptime(date_string, self.date_format)
backups = self.get_backups()
backups.sort(key=get_date)
return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups
def delete_excess_backups(self, max_backup_count: int):
"""Delete all excess backups"""
for file in self.get_delete_candidates(max_backup_count):
file.unlink()

137
lib/storage.py Normal file
View file

@ -0,0 +1,137 @@
"""
Storage Module:
Classes:
- Storage
"""
from __future__ import annotations
import json
import shutil
from pathlib import Path
__all__ = ['Storage']
class Storage:
"""
Unifies all filesystem access methods under a single class.
Minimizes room for mistakes and improves reusability.
Methods:
- get_file()
- get_folder()
- write_file()
- read_file()
- delete_file()
- rename_file()
- add_file()
- list_files()
"""
def __init__(self, folder: str, root_folder: str = ".storage") -> None:
"""
Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in.
Args:
folder (str): the name of the folder to use.
"""
self.root = Path.home().joinpath(root_folder)
self.folder = self.root.joinpath(folder)
self.folder.mkdir(exist_ok=True, parents=True)
def get_file(self, path: str) -> Path:
"""Get the fully qualified path of a file in the folder.
Args:
path (str): the name of the file to grab.
Returns:
Path: the path of the file.
"""
return self.folder.joinpath(path)
def get_folder(self, name: str) -> Storage:
"""
Get a folder inside the Storage directory.
Returns a Storage object representing that folder.
"""
return Storage(name, root_folder=self.folder.name)
def write_file(self, name: str, data: dict):
"""Write data to the given file in JSON format.
Args:
name (str): the name of the file.
data (dict): the data to write.
"""
file = self.get_file(name)
file.touch(exist_ok=True)
with file.open("w+", encoding="utf-8") as f:
json.dump(data, f, indent=4)
def read_file(self, name: str) -> dict:
"""Read data from the given file in JSON format.
Args:
name (str): the name of the file.
Returns:
dict: the data from the file.
"""
file = self.get_file(name)
if not file.exists():
return {}
with file.open("r+", encoding="utf-8") as f:
data = json.load(f)
return data
def delete_file(self, name: str):
"""Delete the given file from the folder.
Args:
name (str): the name of the file.
"""
file = self.get_file(name)
file.unlink(missing_ok=True)
def rename_file(self, old_name: str, new_name: str):
"""Rename a file in the folder.
Args:
old_name (str): the current name of the file.
new_name (str): the new name of the file.
"""
file = self.get_file(old_name)
new_file = self.folder.joinpath(new_name)
file.rename(new_file)
def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None):
"""Add a copy of a file to the folder.
If a binary stream is given, it is saved directly to the named location.
Args:
name (str): The name to save it under.
path (Path): The path of the file to copy from.
binary (BinaryIO): The binary stream to copy from.
"""
if path is not None and binary is not None:
raise ValueError(binary, "Cannot supply both a path and a binary stream.")
elif path is not None:
shutil.copy(path, self.folder.joinpath(name))
elif binary is not None:
with self.folder.joinpath(name).open("wb+") as f:
f.write(binary)
def list_files(self, pattern: str | None = None) -> list[Path]:
"""
Return a list of all files in the directory.
"""
files: list[Path] = []
if pattern is None:
for file in self.folder.iterdir():
files.append(file)
else:
for file in self.folder.glob(pattern):
files.append(file)
return files

View file

@ -1,3 +1,4 @@
import lib
source ~/xsh/functions.xsh
def _debug():
@ -41,19 +42,60 @@ def _bwc(args: list):
else:
print(f"copied {i}")
def _mcrcon(args: list):
if p"~/.mcrcon".exists():
source "~/.mcrcon"
else:
raise FileNotFoundError("~/.mcrcon")
$(mcrcon @(" ".join(args)))
def _backup(args: list):
from pathlib import Path
target_path = Path(args[0])
max_backup_count: int = 3
if len(args) == 2 and isinstance(args[1], int):
max_backup_count = int(args[1])
bm = lib.BackupManager(target_path)
bm.create_backup()
bm.delete_excess_backups(max_backup_count)
def _mcterm():
if p"~/.mcrcon".exists():
source "~/.mcrcon"
else:
raise FileNotFoundError("~/.mcrcon")
def _mcrcon(args: list):
import json
try:
with p"~/.mcrcon.json".open("r") as fp:
servers_data = json.load(fp)
server_data = servers_data[args[0]]
$MCRCON_HOST = server_data["address"]
$MCRCON_PORT = server_data["port"]
$MCRCON_PASS = server_data["password"]
$(mcrcon @(" ".join(args[1:])))
except IndexError:
print("Please indicate which server to connect to.")
except KeyError:
print("Server configuration not found.")
except FileNotFoundError:
print("Server configuration file '.mcrcon.json' is not found.")
def _mcterm(args: list):
import json
try:
with p"~/.mcrcon.json".open("r") as fp:
servers_data = json.load(fp)
server_data = servers_data[args[0]]
$MCRCON_HOST = server_data["address"]
$MCRCON_PORT = server_data["port"]
$MCRCON_PASS = server_data["password"]
mcrcon -t
except IndexError:
print("Please indicate which server to connect to.")
except KeyError:
print("Server configuration not found.")
except FileNotFoundError:
print("Server configuration file '.mcrcon.json' is not found.")
def _mclist():
import json
try:
with p"~/.mcrcon.json".open("r") as fp:
servers_data = json.load(fp)
print("Servers:")
for key in servers_data:
print("\t-", key)
except FileNotFoundError:
print("Server configuration file '.mcrcon.json' is not found.")
def _alias():
for alias in aliases:
@ -91,12 +133,15 @@ def _ls():
def load_aliases():
aliases.update({
'bu': _backup,
'backup': _backup,
'bwg': _bwc,
'colortest': _colortest,
'debug': _debug,
'ls': 'ls -alhs --color=auto',
'mc': _mcrcon,
'mct': _mcterm,
'mcl': _mclist,
':q': 'exit',
'ensure-tmux': _ensure_tmux,
'aliases': _alias,