Compare commits
2 commits
eedecb34d5
...
44555fc84e
Author | SHA1 | Date | |
---|---|---|---|
44555fc84e | |||
067ab48de0 |
3 changed files with 323 additions and 12 deletions
129
lib/backup.py
Normal file
129
lib/backup.py
Normal file
|
@ -0,0 +1,129 @@
|
|||
"""
|
||||
Backup Module:
|
||||
|
||||
Classes:
|
||||
- BackupManager
|
||||
"""
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
from . import Storage
|
||||
|
||||
__all__ = ["BackupManager"]
|
||||
|
||||
class BackupManager:
|
||||
"""
|
||||
Facilitates backup creation. Stores all backups in ~/.storage/backups
|
||||
|
||||
Methods:
|
||||
- create_backup()
|
||||
- get_backups()
|
||||
- get_delete_candidates()
|
||||
- delete_excess_backups()
|
||||
"""
|
||||
def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"),
|
||||
date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None:
|
||||
"""
|
||||
Test
|
||||
"""
|
||||
self.target_path = target_path
|
||||
self.backup_storage = backup_storage
|
||||
self.date_format = date_format
|
||||
self.separator = separator
|
||||
|
||||
@property
|
||||
def target_path(self) -> Path:
|
||||
"""
|
||||
Target to create backups of. This could be a file or folder.
|
||||
"""
|
||||
return self._target_path
|
||||
|
||||
@target_path.setter
|
||||
def target_path(self, new_path: Path):
|
||||
if isinstance(new_path, Path):
|
||||
if new_path.exists() and (new_path.is_dir() or new_path.is_file()):
|
||||
self._target_path = new_path
|
||||
else:
|
||||
raise ValueError(new_path)
|
||||
else:
|
||||
raise TypeError(new_path)
|
||||
|
||||
@property
|
||||
def backup_storage(self) -> Storage:
|
||||
"""
|
||||
Storage object to store created backups in.
|
||||
"""
|
||||
return self._backup_storage
|
||||
|
||||
@backup_storage.setter
|
||||
def backup_storage(self, new_storage: Storage):
|
||||
if isinstance(new_storage, Storage):
|
||||
self._backup_storage = new_storage
|
||||
else:
|
||||
raise TypeError(new_storage)
|
||||
|
||||
@property
|
||||
def date_format(self) -> str:
|
||||
"""The datetime format string used to date the backups."""
|
||||
return self._date_format
|
||||
|
||||
@date_format.setter
|
||||
def date_format(self, new_format):
|
||||
if isinstance(new_format, str):
|
||||
self._date_format = new_format
|
||||
else:
|
||||
raise TypeError(new_format)
|
||||
|
||||
@property
|
||||
def separator(self) -> str:
|
||||
"""The separator that separates the archive name from the date"""
|
||||
return self._separator
|
||||
|
||||
@separator.setter
|
||||
def separator(self, new_separator):
|
||||
if isinstance(new_separator, str):
|
||||
if new_separator not in self.target_path.name and new_separator not in self.date_format:
|
||||
self._separator = new_separator
|
||||
else:
|
||||
raise ValueError(new_separator)
|
||||
else:
|
||||
raise TypeError(new_separator)
|
||||
|
||||
def create_backup(self):
|
||||
"""
|
||||
Create a backup of the target path, stored in the backup storage folder.
|
||||
"""
|
||||
date_string = datetime.now().strftime(self.date_format)
|
||||
backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip"
|
||||
backup_path = self.backup_storage.get_file(backup_name)
|
||||
with ZipFile(backup_path, mode="w") as zip_file:
|
||||
for item in self.target_path.glob("**/*"):
|
||||
zip_file.write(item, item.relative_to(self.target_path))
|
||||
|
||||
def get_backups(self):
|
||||
"""
|
||||
Get all backups found in the given folder.
|
||||
"""
|
||||
return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip")
|
||||
|
||||
def get_delete_candidates(self, max_backup_count) -> list[Path]:
|
||||
"""
|
||||
Get all candidates for deletion with the given max_backup_count.
|
||||
If none are available for deletion, returns None.
|
||||
"""
|
||||
def get_date(file: Path) -> datetime:
|
||||
"""
|
||||
Turns the datetime string in the file name into a datetime object.
|
||||
"""
|
||||
date_string = file.name.split(self.separator)[1].replace(file.suffix, "")
|
||||
return datetime.strptime(date_string, self.date_format)
|
||||
|
||||
backups = self.get_backups()
|
||||
backups.sort(key=get_date)
|
||||
return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups
|
||||
|
||||
def delete_excess_backups(self, max_backup_count: int):
|
||||
"""Delete all excess backups"""
|
||||
for file in self.get_delete_candidates(max_backup_count):
|
||||
file.unlink()
|
137
lib/storage.py
Normal file
137
lib/storage.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
"""
|
||||
Storage Module:
|
||||
|
||||
Classes:
|
||||
- Storage
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
__all__ = ['Storage']
|
||||
|
||||
class Storage:
|
||||
"""
|
||||
Unifies all filesystem access methods under a single class.
|
||||
Minimizes room for mistakes and improves reusability.
|
||||
|
||||
Methods:
|
||||
- get_file()
|
||||
- get_folder()
|
||||
- write_file()
|
||||
- read_file()
|
||||
- delete_file()
|
||||
- rename_file()
|
||||
- add_file()
|
||||
- list_files()
|
||||
"""
|
||||
|
||||
def __init__(self, folder: str, root_folder: str = ".storage") -> None:
|
||||
"""
|
||||
Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in.
|
||||
|
||||
Args:
|
||||
folder (str): the name of the folder to use.
|
||||
"""
|
||||
self.root = Path.home().joinpath(root_folder)
|
||||
self.folder = self.root.joinpath(folder)
|
||||
self.folder.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
def get_file(self, path: str) -> Path:
|
||||
"""Get the fully qualified path of a file in the folder.
|
||||
|
||||
Args:
|
||||
path (str): the name of the file to grab.
|
||||
|
||||
Returns:
|
||||
Path: the path of the file.
|
||||
"""
|
||||
return self.folder.joinpath(path)
|
||||
|
||||
def get_folder(self, name: str) -> Storage:
|
||||
"""
|
||||
Get a folder inside the Storage directory.
|
||||
Returns a Storage object representing that folder.
|
||||
"""
|
||||
return Storage(name, root_folder=self.folder.name)
|
||||
|
||||
def write_file(self, name: str, data: dict):
|
||||
"""Write data to the given file in JSON format.
|
||||
|
||||
Args:
|
||||
name (str): the name of the file.
|
||||
data (dict): the data to write.
|
||||
"""
|
||||
file = self.get_file(name)
|
||||
file.touch(exist_ok=True)
|
||||
with file.open("w+", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=4)
|
||||
|
||||
def read_file(self, name: str) -> dict:
|
||||
"""Read data from the given file in JSON format.
|
||||
|
||||
Args:
|
||||
name (str): the name of the file.
|
||||
|
||||
Returns:
|
||||
dict: the data from the file.
|
||||
"""
|
||||
file = self.get_file(name)
|
||||
if not file.exists():
|
||||
return {}
|
||||
with file.open("r+", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
|
||||
def delete_file(self, name: str):
|
||||
"""Delete the given file from the folder.
|
||||
|
||||
Args:
|
||||
name (str): the name of the file.
|
||||
"""
|
||||
file = self.get_file(name)
|
||||
file.unlink(missing_ok=True)
|
||||
|
||||
def rename_file(self, old_name: str, new_name: str):
|
||||
"""Rename a file in the folder.
|
||||
|
||||
Args:
|
||||
old_name (str): the current name of the file.
|
||||
new_name (str): the new name of the file.
|
||||
"""
|
||||
file = self.get_file(old_name)
|
||||
new_file = self.folder.joinpath(new_name)
|
||||
file.rename(new_file)
|
||||
|
||||
def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None):
|
||||
"""Add a copy of a file to the folder.
|
||||
If a binary stream is given, it is saved directly to the named location.
|
||||
|
||||
Args:
|
||||
name (str): The name to save it under.
|
||||
path (Path): The path of the file to copy from.
|
||||
binary (BinaryIO): The binary stream to copy from.
|
||||
"""
|
||||
if path is not None and binary is not None:
|
||||
raise ValueError(binary, "Cannot supply both a path and a binary stream.")
|
||||
elif path is not None:
|
||||
shutil.copy(path, self.folder.joinpath(name))
|
||||
elif binary is not None:
|
||||
with self.folder.joinpath(name).open("wb+") as f:
|
||||
f.write(binary)
|
||||
|
||||
def list_files(self, pattern: str | None = None) -> list[Path]:
|
||||
"""
|
||||
Return a list of all files in the directory.
|
||||
"""
|
||||
files: list[Path] = []
|
||||
if pattern is None:
|
||||
for file in self.folder.iterdir():
|
||||
files.append(file)
|
||||
else:
|
||||
for file in self.folder.glob(pattern):
|
||||
files.append(file)
|
||||
return files
|
|
@ -1,3 +1,4 @@
|
|||
import lib
|
||||
source ~/xsh/functions.xsh
|
||||
|
||||
def _debug():
|
||||
|
@ -41,19 +42,60 @@ def _bwc(args: list):
|
|||
else:
|
||||
print(f"copied {i}")
|
||||
|
||||
def _mcrcon(args: list):
|
||||
if p"~/.mcrcon".exists():
|
||||
source "~/.mcrcon"
|
||||
else:
|
||||
raise FileNotFoundError("~/.mcrcon")
|
||||
$(mcrcon @(" ".join(args)))
|
||||
def _backup(args: list):
|
||||
from pathlib import Path
|
||||
target_path = Path(args[0])
|
||||
max_backup_count: int = 3
|
||||
if len(args) == 2 and isinstance(args[1], int):
|
||||
max_backup_count = int(args[1])
|
||||
bm = lib.BackupManager(target_path)
|
||||
bm.create_backup()
|
||||
bm.delete_excess_backups(max_backup_count)
|
||||
|
||||
def _mcterm():
|
||||
if p"~/.mcrcon".exists():
|
||||
source "~/.mcrcon"
|
||||
else:
|
||||
raise FileNotFoundError("~/.mcrcon")
|
||||
mcrcon -t
|
||||
def _mcrcon(args: list):
|
||||
import json
|
||||
try:
|
||||
with p"~/.mcrcon.json".open("r") as fp:
|
||||
servers_data = json.load(fp)
|
||||
server_data = servers_data[args[0]]
|
||||
$MCRCON_HOST = server_data["address"]
|
||||
$MCRCON_PORT = server_data["port"]
|
||||
$MCRCON_PASS = server_data["password"]
|
||||
$(mcrcon @(" ".join(args[1:])))
|
||||
except IndexError:
|
||||
print("Please indicate which server to connect to.")
|
||||
except KeyError:
|
||||
print("Server configuration not found.")
|
||||
except FileNotFoundError:
|
||||
print("Server configuration file '.mcrcon.json' is not found.")
|
||||
|
||||
def _mcterm(args: list):
|
||||
import json
|
||||
try:
|
||||
with p"~/.mcrcon.json".open("r") as fp:
|
||||
servers_data = json.load(fp)
|
||||
server_data = servers_data[args[0]]
|
||||
$MCRCON_HOST = server_data["address"]
|
||||
$MCRCON_PORT = server_data["port"]
|
||||
$MCRCON_PASS = server_data["password"]
|
||||
mcrcon -t
|
||||
except IndexError:
|
||||
print("Please indicate which server to connect to.")
|
||||
except KeyError:
|
||||
print("Server configuration not found.")
|
||||
except FileNotFoundError:
|
||||
print("Server configuration file '.mcrcon.json' is not found.")
|
||||
|
||||
def _mclist():
|
||||
import json
|
||||
try:
|
||||
with p"~/.mcrcon.json".open("r") as fp:
|
||||
servers_data = json.load(fp)
|
||||
print("Servers:")
|
||||
for key in servers_data:
|
||||
print("\t-", key)
|
||||
except FileNotFoundError:
|
||||
print("Server configuration file '.mcrcon.json' is not found.")
|
||||
|
||||
def _alias():
|
||||
for alias in aliases:
|
||||
|
@ -91,12 +133,15 @@ def _ls():
|
|||
|
||||
def load_aliases():
|
||||
aliases.update({
|
||||
'bu': _backup,
|
||||
'backup': _backup,
|
||||
'bwg': _bwc,
|
||||
'colortest': _colortest,
|
||||
'debug': _debug,
|
||||
'ls': 'ls -alhs --color=auto',
|
||||
'mc': _mcrcon,
|
||||
'mct': _mcterm,
|
||||
'mcl': _mclist,
|
||||
':q': 'exit',
|
||||
'ensure-tmux': _ensure_tmux,
|
||||
'aliases': _alias,
|
||||
|
|
Loading…
Reference in a new issue