Compare commits
No commits in common. "44555fc84e056e1c3a332787d8d2212566812b0e" and "eedecb34d52e5c8f9f83f26f97a1b712b9038d7e" have entirely different histories.
44555fc84e
...
eedecb34d5
3 changed files with 11 additions and 322 deletions
129
lib/backup.py
129
lib/backup.py
|
@ -1,129 +0,0 @@
|
||||||
"""
|
|
||||||
Backup Module:
|
|
||||||
|
|
||||||
Classes:
|
|
||||||
- BackupManager
|
|
||||||
"""
|
|
||||||
from datetime import datetime
|
|
||||||
from pathlib import Path
|
|
||||||
from zipfile import ZipFile
|
|
||||||
|
|
||||||
from . import Storage
|
|
||||||
|
|
||||||
__all__ = ["BackupManager"]
|
|
||||||
|
|
||||||
class BackupManager:
|
|
||||||
"""
|
|
||||||
Facilitates backup creation. Stores all backups in ~/.storage/backups
|
|
||||||
|
|
||||||
Methods:
|
|
||||||
- create_backup()
|
|
||||||
- get_backups()
|
|
||||||
- get_delete_candidates()
|
|
||||||
- delete_excess_backups()
|
|
||||||
"""
|
|
||||||
def __init__(self, target_path: Path, backup_storage: Storage = Storage("backups"),
|
|
||||||
date_format: str = "%d_%m_%y__%H%M%S", separator: str = "-") -> None:
|
|
||||||
"""
|
|
||||||
Test
|
|
||||||
"""
|
|
||||||
self.target_path = target_path
|
|
||||||
self.backup_storage = backup_storage
|
|
||||||
self.date_format = date_format
|
|
||||||
self.separator = separator
|
|
||||||
|
|
||||||
@property
|
|
||||||
def target_path(self) -> Path:
|
|
||||||
"""
|
|
||||||
Target to create backups of. This could be a file or folder.
|
|
||||||
"""
|
|
||||||
return self._target_path
|
|
||||||
|
|
||||||
@target_path.setter
|
|
||||||
def target_path(self, new_path: Path):
|
|
||||||
if isinstance(new_path, Path):
|
|
||||||
if new_path.exists() and (new_path.is_dir() or new_path.is_file()):
|
|
||||||
self._target_path = new_path
|
|
||||||
else:
|
|
||||||
raise ValueError(new_path)
|
|
||||||
else:
|
|
||||||
raise TypeError(new_path)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def backup_storage(self) -> Storage:
|
|
||||||
"""
|
|
||||||
Storage object to store created backups in.
|
|
||||||
"""
|
|
||||||
return self._backup_storage
|
|
||||||
|
|
||||||
@backup_storage.setter
|
|
||||||
def backup_storage(self, new_storage: Storage):
|
|
||||||
if isinstance(new_storage, Storage):
|
|
||||||
self._backup_storage = new_storage
|
|
||||||
else:
|
|
||||||
raise TypeError(new_storage)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def date_format(self) -> str:
|
|
||||||
"""The datetime format string used to date the backups."""
|
|
||||||
return self._date_format
|
|
||||||
|
|
||||||
@date_format.setter
|
|
||||||
def date_format(self, new_format):
|
|
||||||
if isinstance(new_format, str):
|
|
||||||
self._date_format = new_format
|
|
||||||
else:
|
|
||||||
raise TypeError(new_format)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def separator(self) -> str:
|
|
||||||
"""The separator that separates the archive name from the date"""
|
|
||||||
return self._separator
|
|
||||||
|
|
||||||
@separator.setter
|
|
||||||
def separator(self, new_separator):
|
|
||||||
if isinstance(new_separator, str):
|
|
||||||
if new_separator not in self.target_path.name and new_separator not in self.date_format:
|
|
||||||
self._separator = new_separator
|
|
||||||
else:
|
|
||||||
raise ValueError(new_separator)
|
|
||||||
else:
|
|
||||||
raise TypeError(new_separator)
|
|
||||||
|
|
||||||
def create_backup(self):
|
|
||||||
"""
|
|
||||||
Create a backup of the target path, stored in the backup storage folder.
|
|
||||||
"""
|
|
||||||
date_string = datetime.now().strftime(self.date_format)
|
|
||||||
backup_name = f"{self.target_path.name}{self.separator}{date_string}.zip"
|
|
||||||
backup_path = self.backup_storage.get_file(backup_name)
|
|
||||||
with ZipFile(backup_path, mode="w") as zip_file:
|
|
||||||
for item in self.target_path.glob("**/*"):
|
|
||||||
zip_file.write(item, item.relative_to(self.target_path))
|
|
||||||
|
|
||||||
def get_backups(self):
|
|
||||||
"""
|
|
||||||
Get all backups found in the given folder.
|
|
||||||
"""
|
|
||||||
return self.backup_storage.list_files(f"{self.target_path.stem}{self.separator}*.zip")
|
|
||||||
|
|
||||||
def get_delete_candidates(self, max_backup_count) -> list[Path]:
|
|
||||||
"""
|
|
||||||
Get all candidates for deletion with the given max_backup_count.
|
|
||||||
If none are available for deletion, returns None.
|
|
||||||
"""
|
|
||||||
def get_date(file: Path) -> datetime:
|
|
||||||
"""
|
|
||||||
Turns the datetime string in the file name into a datetime object.
|
|
||||||
"""
|
|
||||||
date_string = file.name.split(self.separator)[1].replace(file.suffix, "")
|
|
||||||
return datetime.strptime(date_string, self.date_format)
|
|
||||||
|
|
||||||
backups = self.get_backups()
|
|
||||||
backups.sort(key=get_date)
|
|
||||||
return backups[:(len(backups)-max_backup_count)] # returns the oldest excess backups
|
|
||||||
|
|
||||||
def delete_excess_backups(self, max_backup_count: int):
|
|
||||||
"""Delete all excess backups"""
|
|
||||||
for file in self.get_delete_candidates(max_backup_count):
|
|
||||||
file.unlink()
|
|
137
lib/storage.py
137
lib/storage.py
|
@ -1,137 +0,0 @@
|
||||||
"""
|
|
||||||
Storage Module:
|
|
||||||
|
|
||||||
Classes:
|
|
||||||
- Storage
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
__all__ = ['Storage']
|
|
||||||
|
|
||||||
class Storage:
|
|
||||||
"""
|
|
||||||
Unifies all filesystem access methods under a single class.
|
|
||||||
Minimizes room for mistakes and improves reusability.
|
|
||||||
|
|
||||||
Methods:
|
|
||||||
- get_file()
|
|
||||||
- get_folder()
|
|
||||||
- write_file()
|
|
||||||
- read_file()
|
|
||||||
- delete_file()
|
|
||||||
- rename_file()
|
|
||||||
- add_file()
|
|
||||||
- list_files()
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, folder: str, root_folder: str = ".storage") -> None:
|
|
||||||
"""
|
|
||||||
Create a new storage instance, it automatically creates and manages a folder in the user's home directory, all you have to do is supply a subfolder to store files in.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
folder (str): the name of the folder to use.
|
|
||||||
"""
|
|
||||||
self.root = Path.home().joinpath(root_folder)
|
|
||||||
self.folder = self.root.joinpath(folder)
|
|
||||||
self.folder.mkdir(exist_ok=True, parents=True)
|
|
||||||
|
|
||||||
def get_file(self, path: str) -> Path:
|
|
||||||
"""Get the fully qualified path of a file in the folder.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path (str): the name of the file to grab.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Path: the path of the file.
|
|
||||||
"""
|
|
||||||
return self.folder.joinpath(path)
|
|
||||||
|
|
||||||
def get_folder(self, name: str) -> Storage:
|
|
||||||
"""
|
|
||||||
Get a folder inside the Storage directory.
|
|
||||||
Returns a Storage object representing that folder.
|
|
||||||
"""
|
|
||||||
return Storage(name, root_folder=self.folder.name)
|
|
||||||
|
|
||||||
def write_file(self, name: str, data: dict):
|
|
||||||
"""Write data to the given file in JSON format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name (str): the name of the file.
|
|
||||||
data (dict): the data to write.
|
|
||||||
"""
|
|
||||||
file = self.get_file(name)
|
|
||||||
file.touch(exist_ok=True)
|
|
||||||
with file.open("w+", encoding="utf-8") as f:
|
|
||||||
json.dump(data, f, indent=4)
|
|
||||||
|
|
||||||
def read_file(self, name: str) -> dict:
|
|
||||||
"""Read data from the given file in JSON format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name (str): the name of the file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: the data from the file.
|
|
||||||
"""
|
|
||||||
file = self.get_file(name)
|
|
||||||
if not file.exists():
|
|
||||||
return {}
|
|
||||||
with file.open("r+", encoding="utf-8") as f:
|
|
||||||
data = json.load(f)
|
|
||||||
return data
|
|
||||||
|
|
||||||
def delete_file(self, name: str):
|
|
||||||
"""Delete the given file from the folder.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name (str): the name of the file.
|
|
||||||
"""
|
|
||||||
file = self.get_file(name)
|
|
||||||
file.unlink(missing_ok=True)
|
|
||||||
|
|
||||||
def rename_file(self, old_name: str, new_name: str):
|
|
||||||
"""Rename a file in the folder.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
old_name (str): the current name of the file.
|
|
||||||
new_name (str): the new name of the file.
|
|
||||||
"""
|
|
||||||
file = self.get_file(old_name)
|
|
||||||
new_file = self.folder.joinpath(new_name)
|
|
||||||
file.rename(new_file)
|
|
||||||
|
|
||||||
def add_file(self, name: str, path: Path | None = None, binary: bytes | None = None):
|
|
||||||
"""Add a copy of a file to the folder.
|
|
||||||
If a binary stream is given, it is saved directly to the named location.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name (str): The name to save it under.
|
|
||||||
path (Path): The path of the file to copy from.
|
|
||||||
binary (BinaryIO): The binary stream to copy from.
|
|
||||||
"""
|
|
||||||
if path is not None and binary is not None:
|
|
||||||
raise ValueError(binary, "Cannot supply both a path and a binary stream.")
|
|
||||||
elif path is not None:
|
|
||||||
shutil.copy(path, self.folder.joinpath(name))
|
|
||||||
elif binary is not None:
|
|
||||||
with self.folder.joinpath(name).open("wb+") as f:
|
|
||||||
f.write(binary)
|
|
||||||
|
|
||||||
def list_files(self, pattern: str | None = None) -> list[Path]:
|
|
||||||
"""
|
|
||||||
Return a list of all files in the directory.
|
|
||||||
"""
|
|
||||||
files: list[Path] = []
|
|
||||||
if pattern is None:
|
|
||||||
for file in self.folder.iterdir():
|
|
||||||
files.append(file)
|
|
||||||
else:
|
|
||||||
for file in self.folder.glob(pattern):
|
|
||||||
files.append(file)
|
|
||||||
return files
|
|
|
@ -1,4 +1,3 @@
|
||||||
import lib
|
|
||||||
source ~/xsh/functions.xsh
|
source ~/xsh/functions.xsh
|
||||||
|
|
||||||
def _debug():
|
def _debug():
|
||||||
|
@ -42,60 +41,19 @@ def _bwc(args: list):
|
||||||
else:
|
else:
|
||||||
print(f"copied {i}")
|
print(f"copied {i}")
|
||||||
|
|
||||||
def _backup(args: list):
|
|
||||||
from pathlib import Path
|
|
||||||
target_path = Path(args[0])
|
|
||||||
max_backup_count: int = 3
|
|
||||||
if len(args) == 2 and isinstance(args[1], int):
|
|
||||||
max_backup_count = int(args[1])
|
|
||||||
bm = lib.BackupManager(target_path)
|
|
||||||
bm.create_backup()
|
|
||||||
bm.delete_excess_backups(max_backup_count)
|
|
||||||
|
|
||||||
def _mcrcon(args: list):
|
def _mcrcon(args: list):
|
||||||
import json
|
if p"~/.mcrcon".exists():
|
||||||
try:
|
source "~/.mcrcon"
|
||||||
with p"~/.mcrcon.json".open("r") as fp:
|
else:
|
||||||
servers_data = json.load(fp)
|
raise FileNotFoundError("~/.mcrcon")
|
||||||
server_data = servers_data[args[0]]
|
$(mcrcon @(" ".join(args)))
|
||||||
$MCRCON_HOST = server_data["address"]
|
|
||||||
$MCRCON_PORT = server_data["port"]
|
|
||||||
$MCRCON_PASS = server_data["password"]
|
|
||||||
$(mcrcon @(" ".join(args[1:])))
|
|
||||||
except IndexError:
|
|
||||||
print("Please indicate which server to connect to.")
|
|
||||||
except KeyError:
|
|
||||||
print("Server configuration not found.")
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("Server configuration file '.mcrcon.json' is not found.")
|
|
||||||
|
|
||||||
def _mcterm(args: list):
|
def _mcterm():
|
||||||
import json
|
if p"~/.mcrcon".exists():
|
||||||
try:
|
source "~/.mcrcon"
|
||||||
with p"~/.mcrcon.json".open("r") as fp:
|
else:
|
||||||
servers_data = json.load(fp)
|
raise FileNotFoundError("~/.mcrcon")
|
||||||
server_data = servers_data[args[0]]
|
|
||||||
$MCRCON_HOST = server_data["address"]
|
|
||||||
$MCRCON_PORT = server_data["port"]
|
|
||||||
$MCRCON_PASS = server_data["password"]
|
|
||||||
mcrcon -t
|
mcrcon -t
|
||||||
except IndexError:
|
|
||||||
print("Please indicate which server to connect to.")
|
|
||||||
except KeyError:
|
|
||||||
print("Server configuration not found.")
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("Server configuration file '.mcrcon.json' is not found.")
|
|
||||||
|
|
||||||
def _mclist():
|
|
||||||
import json
|
|
||||||
try:
|
|
||||||
with p"~/.mcrcon.json".open("r") as fp:
|
|
||||||
servers_data = json.load(fp)
|
|
||||||
print("Servers:")
|
|
||||||
for key in servers_data:
|
|
||||||
print("\t-", key)
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("Server configuration file '.mcrcon.json' is not found.")
|
|
||||||
|
|
||||||
def _alias():
|
def _alias():
|
||||||
for alias in aliases:
|
for alias in aliases:
|
||||||
|
@ -133,15 +91,12 @@ def _ls():
|
||||||
|
|
||||||
def load_aliases():
|
def load_aliases():
|
||||||
aliases.update({
|
aliases.update({
|
||||||
'bu': _backup,
|
|
||||||
'backup': _backup,
|
|
||||||
'bwg': _bwc,
|
'bwg': _bwc,
|
||||||
'colortest': _colortest,
|
'colortest': _colortest,
|
||||||
'debug': _debug,
|
'debug': _debug,
|
||||||
'ls': 'ls -alhs --color=auto',
|
'ls': 'ls -alhs --color=auto',
|
||||||
'mc': _mcrcon,
|
'mc': _mcrcon,
|
||||||
'mct': _mcterm,
|
'mct': _mcterm,
|
||||||
'mcl': _mclist,
|
|
||||||
':q': 'exit',
|
':q': 'exit',
|
||||||
'ensure-tmux': _ensure_tmux,
|
'ensure-tmux': _ensure_tmux,
|
||||||
'aliases': _alias,
|
'aliases': _alias,
|
||||||
|
|
Loading…
Reference in a new issue