initial commit

This commit is contained in:
Marcin Kowalicki 2020-10-22 17:40:30 +02:00
commit 0422e09400
16 changed files with 2325 additions and 0 deletions

69
__init__.py Normal file
View file

@ -0,0 +1,69 @@
from librus_tricks import exceptions
from librus_tricks.auth import authorizer, load_json as __load_json
from librus_tricks.classes import *
from librus_tricks.core import SynergiaClient
__name__ = 'librus_tricks'
__title__ = 'librus_tricks'
__author__ = 'Krystian Postek'
__version__ = '0.8.1'
def create_session(email, password, fetch_first=True, **kwargs):
"""
Używaj tego tylko kiedy hasło do Portal Librus jest takie samo jako do Synergii.
:param email: str
:param password: str
:param fetch_first: bool or int
:rtype: librus_tricks.core.SynergiaClient
:return: obiekt lub listę obiektów z sesjami
"""
if fetch_first is True:
user = authorizer(email, password)[0]
session = SynergiaClient(user, **kwargs)
elif fetch_first is False:
users = authorizer(email, password)
sessions = [SynergiaClient(user, **kwargs) for user in users]
return sessions
else:
user = authorizer(email, password)[fetch_first]
session = SynergiaClient(user, **kwargs)
return session
def use_json(file=None, **kwargs):
if file is None:
from glob import glob
jsons = glob('*.json')
if jsons.__len__() == 0:
raise FileNotFoundError('Nie znaleziono zapisanych sesji')
if jsons.__len__() > 1:
raise FileExistsError('Zaleziono za dużo zapisanych sesji')
user = __load_json(open(jsons[0], 'r'))
else:
user = __load_json(file)
session = SynergiaClient(user, **kwargs)
session.get('Me')
return session
def minified_login(email, password, **kwargs):
import logging
import warnings
warnings.warn(exceptions.SecurityWarning('Using minified_login in production environment is REAL SECURITY ISSUE!'))
try:
logging.debug('Trying to use json file to create session')
session = use_json(**kwargs)
logging.debug('Created session using json file')
except Exception:
logging.debug('Switching to regular http auth')
session = create_session(email, password, **kwargs)
logging.debug('Created session using http auth')
session.user.dump_credentials()
return session

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

210
auth.py Normal file
View file

@ -0,0 +1,210 @@
import logging
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from .exceptions import *
# Some globals
REDIRURL = 'http://localhost/bar'
LOGINURL = 'https://portal.librus.pl/rodzina/login/action'
OAUTHURL = 'https://portal.librus.pl/oauth2/access_token'
SYNERGIAAUTHURL = 'https://portal.librus.pl/api/v2/SynergiaAccounts'
FRESHURL = 'https://portal.librus.pl/api/v2/SynergiaAccounts/fresh/{login}'
CLIENTID = '6XPsKf10LPz1nxgHQLcvZ1KM48DYzlBAhxipaXY8'
LIBRUSLOGINURL = f'https://portal.librus.pl/oauth2/authorize?client_id={CLIENTID}&redirect_uri={REDIRURL}&response_type=code'
# User agents
XIAOMI_USERAGENT = 'Mozilla/5.0 (Linux; Android 9; Mi A1 Build/PQ3B.190801.002; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3921.2 Mobile Safari/537.36LibrusMobileApp'
IPHONE_USERAGENT = 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/77.0.3865.103 Mobile/15E148 Safari/605.1LibrusMobileApp'
GOOGLEBOT_USERAGENT = 'Googlebot/2.1 (+http://www.google.com/bot.html)LibrusMobileApp'
class SynergiaUser:
"""
Obiekt zawierający dane do tworzenia sesji
"""
def __init__(self, user_dict, root_token, revalidation_token, exp_in):
self.token = user_dict['accessToken']
self.refresh_token = revalidation_token
self.root_token = root_token
self.name, self.last_name = user_dict['studentName'].split(' ', maxsplit=1)
self.login = user_dict['login']
self.uid = user_dict['id']
self.expires_in = datetime.now() + timedelta(seconds=exp_in)
def __repr__(self):
return f'<SynergiaUser for {self.name} {self.last_name} based on ' \
f'token {self.token[:6] + "..." + self.token[-6:]}>'
def __str__(self):
return f'{self.name} {self.last_name}'
def revalidate_root(self):
"""
Aktualizuje token do Portalu Librus.
"""
auth_session = requests.session()
new_tokens = auth_session.post(
OAUTHURL,
data={
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': CLIENTID
}
)
logging.debug('%s response %s', new_tokens.status_code, new_tokens.json())
try:
self.root_token = new_tokens.json()['access_token']
self.refresh_token = new_tokens.json()['refresh_token']
except KeyError:
raise LibrusTricksAuthException('Invalid payload recived', new_tokens.json())
def revalidate_user(self):
"""
Aktualizuje token dostępu do Synergii, który wygasa po 24h.
"""
def do_revalidation():
auth_session = requests.session()
new_token = auth_session.get(
FRESHURL.format(login=self.login),
headers={'Authorization': f'Bearer {self.root_token}'}
)
logging.debug('%s response %s', new_token.status_code, new_token.json())
return new_token
new_token = do_revalidation()
if new_token.json().get('error') == 'access_denied':
logging.info('Obtaing new token failed! Refreshing root token')
self.revalidate_root()
new_token = do_revalidation() # again...
try:
self.token = new_token.json()['accessToken']
except KeyError:
raise LibrusTricksAuthException('Invalid response received', new_token.json())
def check_is_expired(self, use_clock=True, use_query=True):
"""
:param bool use_clock: Sprawdza na podstawie czasu
:param bool use_query: Sprawdza poprzez zapytanie http GET na ``/Me``
:return: krotka z wynikami
:rtype: tuple[bool]
"""
clock_resp = None
query_resp = None
if use_clock:
if datetime.now() > self.expires_in:
clock_resp = False
else:
clock_resp = True
if use_query:
test = requests.get('https://api.librus.pl/2.0/Me', headers={'Authorization': f'Bearer {self.token}'})
if test.status_code == 401:
query_resp = False
else:
query_resp = True
return clock_resp, query_resp
@property
def is_valid(self):
"""
Umożliwia sprawdzenie czy konto ma jeszcze aktualny token.
:return: ``False`` - trzeba wyrobić nowy token
:rtype: bool
"""
return self.check_is_expired(use_clock=False)[1]
def dump_credentials(self, cred_file=None):
import json
if cred_file is None:
cred_file = open(f'{self.login}.json', 'w')
json.dump({
'user_dict': {
'accessToken': self.token,
'studentName': f'{self.name} {self.last_name}',
'id': self.uid,
'login': self.login,
},
'root_token': self.root_token,
'revalidation_token': self.refresh_token,
'exp_in': int(self.expires_in.timestamp())
}, cred_file)
def dict_credentials(self):
return {
'user_dict': {
'accessToken': self.token,
'studentName': f'{self.name} {self.last_name}',
'id': self.uid,
'login': self.login,
},
'root_token': self.root_token,
'revalidation_token': self.refresh_token,
'exp_in': int(self.expires_in.timestamp())
}
def load_json(cred_file):
import json
return SynergiaUser(**json.load(cred_file))
def authorizer(email, password, user_agent=None):
"""
Zwraca listę użytkowników dostępnych dla danego konta Librus Portal
:param str email: Email do Portalu Librus
:param str password: Hasło do Portalu Librus
:return: Listę z użytkownikami połączonymi do konta Librus Synergia
:rtype: list[librus_tricks.auth.SynergiaUser]
"""
if user_agent is None:
from random import choice
user_agent = choice([XIAOMI_USERAGENT, IPHONE_USERAGENT])
logging.debug('No user-agent specified, using %s', user_agent)
auth_session = requests.session()
auth_session.headers.update({'User-Agent': user_agent, 'X-Requested-With': 'pl.librus.synergiaDru2'})
site = auth_session.get(LIBRUSLOGINURL)
soup = BeautifulSoup(site.text, 'html.parser')
csrf = soup.find('meta', attrs={'name': 'csrf-token'})['content']
login_response_redirection = auth_session.post(
LOGINURL, json={'email': email, 'password': password},
headers={'X-CSRF-TOKEN': csrf, 'Content-Type': 'application/json'}
)
if login_response_redirection.status_code != 200:
if login_response_redirection.status_code == 403:
if 'g-recaptcha-response' in login_response_redirection.json()['errors']:
raise CaptchaRequired(login_response_redirection.json())
raise LibrusPortalInvalidPasswordError(login_response_redirection.json())
raise LibrusLoginError(login_response_redirection.text)
redirection_addr = login_response_redirection.json()['redirect']
redirection_response = auth_session.get(redirection_addr, allow_redirects=False)
oauth_code = redirection_response.headers['location'].replace('http://localhost/bar?code=', '')
synergia_root_response = auth_session.post(
OAUTHURL,
data={
'grant_type': 'authorization_code',
'code': oauth_code,
'client_id': CLIENTID,
'redirect_uri': REDIRURL
}
)
synergia_root_login_token = synergia_root_response.json()['access_token']
synergia_root_revalidation_token = synergia_root_response.json()['refresh_token']
synergia_root_expiration = synergia_root_response.json()['expires_in']
synergia_users_response = auth_session.get(SYNERGIAAUTHURL,
headers={'Authorization': f'Bearer {synergia_root_login_token}'})
synergia_users_raw = synergia_users_response.json()['accounts']
synergia_users = [
SynergiaUser(user_data, synergia_root_login_token, synergia_root_revalidation_token, synergia_root_expiration)
for user_data in synergia_users_raw]
return synergia_users

134
cache.py Normal file
View file

@ -0,0 +1,134 @@
from datetime import datetime
from sqlalchemy import create_engine, String, JSON, Column, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
class CacheBase:
def add_object(self, uid, cls, resource):
pass
def get_object(self, uid, cls):
raise NotImplementedError('get_object require implementation')
def del_object(self, uid):
raise NotImplementedError('del_object require implementation')
def clear_objects(self):
raise NotImplementedError('clear_objects require implementation')
def count_object(self):
raise NotImplementedError('count_object require implementation')
def add_query(self, uri, response, user_id):
pass
def get_query(self, uri, user_id):
raise NotImplementedError('get_query require implementation')
def del_query(self, uri, user_id):
raise NotImplementedError('del_query require implementation')
def clear_queries(self):
raise NotImplementedError('clear_queries require implementation')
def count_queries(self):
raise NotImplementedError('count_queries require implementation')
def about_backend(self):
raise NotImplementedError('required providing info about cache provider')
class DumbCache(CacheBase):
def get_object(self, uid, cls):
return
def get_query(self, uri, user_id):
return
def __repr__(self):
return f'<Just dumb cache>'
class AlchemyCache(CacheBase):
Base = declarative_base()
def __init__(self, engine_uri='sqlite:///:memory:'):
engine = create_engine(engine_uri, connect_args={'check_same_thread': False}, poolclass=StaticPool)
db_session = sessionmaker(bind=engine)
db_session.configure(bind=engine)
self.session = db_session()
self.Base.metadata.create_all(engine)
self.syn_session = None
class APIQueryCache(Base):
__tablename__ = 'uri_cache'
pk = Column(Integer(), primary_key=True)
uri = Column(String(length=512))
owner = Column(String(length=16))
response = Column(JSON())
last_load = Column(DateTime())
class ObjectLoadCache(Base):
__tablename__ = 'object_cache'
uid = Column(Integer(), primary_key=True)
name = Column(String(length=64))
resource = Column(JSON())
last_load = Column(DateTime())
def add_object(self, uid, cls, resource):
self.session.add(
self.ObjectLoadCache(uid=uid, name=cls.__name__, resource=resource, last_load=datetime.now())
)
self.session.commit()
def get_object(self, uid, cls):
"""
:rtype: AlchemyCache.ObjectLoadCache
"""
response = self.session.query(self.ObjectLoadCache).filter_by(uid=uid, name=cls.__name__).first()
if response is None:
return None
return cls.assembly(response.resource, self.syn_session)
def add_query(self, uri, response, user_id):
self.session.add(
self.APIQueryCache(uri=uri, response=response, last_load=datetime.now(), owner=user_id)
)
self.session.commit()
def get_query(self, uri, user_id):
"""
:rtype: AlchemyCache.APIQueryCache
"""
return self.session.query(self.APIQueryCache).filter_by(uri=uri, owner=user_id).first()
def del_query(self, uri, user_id):
self.session.query(self.APIQueryCache).filter_by(uri=uri, owner=user_id).delete()
self.session.commit()
def del_object(self, uid):
self.session.query(self.ObjectLoadCache).filter_by(uid=uid).delete()
self.session.commit()
def clear_queries(self):
self.session.query(self.APIQueryCache).delete()
def clear_objects(self):
self.session.query(self.ObjectLoadCache).delete()
def count_object(self):
return self.session.query(self.ObjectLoadCache).count()
def about_backend(self):
return f'SQLAlchemy ORM {self.session.bind.dialect.name} {self.session.bind.dialect.driver}'
def __repr__(self):
return f'<{self.__class__.__name__} with {self.session.bind.dialect.name} backend using {self.session.bind.dialect.driver} driver ({self.session.bind.url})>'

1080
classes.py Normal file

File diff suppressed because it is too large Load diff

483
core.py Normal file
View file

@ -0,0 +1,483 @@
import logging
from datetime import timedelta
import requests
from librus_tricks import cache as cache_lib
from librus_tricks import exceptions, tools
from librus_tricks.classes import *
from librus_tricks.messages import MessageReader
class SynergiaClient:
"""Sesja z API Synergii"""
def __init__(self, user, api_url='https://api.librus.pl/2.0', user_agent='LibrusMobileApp',
cache=cache_lib.AlchemyCache()):
"""
Tworzy sesję z API Synergii.
:param librus_tricks.auth.SynergiaUser user: Użytkownik sesji
:param str api_url: Bazowy url api, zmieniaj jeżeli chcesz używać proxy typu beeceptor
:param str user_agent: User-agent klienta http, domyślnie się podszywa pod aplikację
:param librus_tricks.cache.CacheBase cache: Obiekt, który zarządza cache
"""
self.user = user
self.session = requests.session()
self.session.headers.update({'User-Agent': user_agent})
self.__auth_headers = {'Authorization': f'Bearer {user.token}'}
self.__api_url = api_url
if cache_lib.CacheBase in cache.__class__.__bases__:
self.cache = cache
self.li_session = self
else:
raise exceptions.InvalidCacheManager(f'{cache} can not be a cache object!')
self.__message_reader = None
@property
def message_reader(self):
if self.__message_reader is None:
self.__message_reader = MessageReader(self)
return self.__message_reader
def __repr__(self):
return f'<Synergia session for {self.user}>'
def __update_auth_header(self):
self.__auth_headers = {'Authorization': f'Bearer {self.user.token}'}
logging.debug('Updating headers to %s', self.__auth_headers)
@staticmethod
def assembly_path(*elements, prefix='', suffix='', sep='/'):
"""
Składa str w jednego str, przydatne przy tworzeniu url.
:param str elements: Elementy do stworzenia str
:param str prefix: Początek str
:param str suffix: Koniec str
:param str sep: str wstawiany pomiędzy elementy
:return: Złożony str
:rtype: str
"""
for element in elements:
prefix += sep + str(element)
return prefix + suffix
# HTTP part
def dispatch_http_code(self, response: requests.Response, callback=None, callback_args=tuple(),
callback_kwargs=None):
"""
Sprawdza czy serwer zgłasza błąd poprzez podanie kodu http, w przypadku błędu, rzuca wyjątkiem.
:param requests.Response response:
:raises librus_tricks.exceptions.SynergiaNotFound: 404
:raises librus_tricks.exceptions.SynergiaForbidden: 403
:raises librus_tricks.exceptions.SynergiaAccessDenied: 401
:raises librus_tricks.exceptions.SynergiaInvalidRequest: 401
:rtype: requests.Response
:return: sprawdzona odpowiedź http
"""
if callback_kwargs is None:
callback_kwargs = dict()
logging.debug('Dispatching response status')
if response.json().get('Code') == 'TokenIsExpired':
logging.info('Server returned error code "TokenIsExpired", trying to obtain new token')
self.user.revalidate_user()
self.__update_auth_header()
logging.debug('Repeating failed response')
return callback(*callback_args, **callback_kwargs)
logging.debug('Dispatching http status code')
if response.status_code >= 400:
try:
raise {
503: exceptions.SynergiaMaintenanceError(response.url, response.json()),
500: exceptions.SynergiaServerError(response.url, response.json()),
404: exceptions.SynergiaAPIEndpointNotFound(response.url),
403: exceptions.SynergiaForbidden(response.url, response.json()),
401: exceptions.SynergiaAccessDenied(response.url, response.json()),
400: exceptions.SynergiaAPIInvalidRequest(response.url, response.json()),
}[response.status_code]
except KeyError:
raise exceptions.OtherHTTPResponse('Not excepted HTTP error code!', response.status_code)
return response.json()
def get(self, *path, request_params=None):
"""
Wykonuje odpowiednio spreparowane zapytanie http GET.
:param path: Ścieżka zawierająca węzeł API
:type path: str
:param request_params: dict zawierający kwargs dla zapytania http
:type request_params: dict
:return: json przekonwertowany na dict'a
:rtype: dict
"""
if request_params is None:
request_params = dict()
path_str = self.assembly_path(*path, prefix=self.__api_url)
response = self.session.get(
path_str, headers=self.__auth_headers, params=request_params
)
response = self.dispatch_http_code(response, callback=self.get, callback_args=path,
callback_kwargs=request_params)
return response
def post(self, *path, request_params=None):
"""
Pozwala na dokonanie zapytania http POST.
:param path: Ścieżka zawierająca węzeł API
:type path: str
:param request_params: dict zawierający kwargs dla zapytania http
:type request_params: dict
:return: json przekonwertowany na dict'a
:rtype: dict
"""
if request_params is None:
request_params = dict()
path_str = self.assembly_path(*path, prefix=self.__api_url)
response = self.session.post(
path_str, headers=self.__auth_headers, params=request_params
)
response = self.dispatch_http_code(response, callback=self.post, callback_args=path,
callback_kwargs=request_params)
return response
# Cache
def get_cached_response(self, *path, http_params=None, max_lifetime=timedelta(hours=1)):
"""
Wykonuje zapytanie http GET z poprzednim sprawdzeniem cache.
:param path: Niezłożona ścieżka do węzła API
:param http_params: dict zawierający kwargs dla zapytania http
:type http_params: dict
:param timedelta max_lifetime: Maksymalny czas ważności cache dla tego zapytania http
:return: dict zawierający odpowiedź zapytania
:rtype: dict
"""
uri = self.assembly_path(*path, prefix=self.__api_url)
response_cached = self.cache.get_query(uri, self.user.uid)
if response_cached is None:
logging.debug('Response is not present in cache!')
http_response = self.get(*path, request_params=http_params)
self.cache.add_query(uri, http_response, self.user.uid)
return http_response
try:
age = datetime.now() - response_cached.last_load
except TypeError:
age = datetime.now() - response_cached.last_load.replace(tzinfo=None)
if age > max_lifetime:
logging.debug('Response is too old! Trying to get latest response from api')
http_response = self.get(*path, request_params=http_params)
self.cache.del_query(uri, self.user.uid)
self.cache.add_query(uri, http_response, self.user.uid)
return http_response
return response_cached.response
def get_cached_object(self, uid, cls, max_lifetime=timedelta(hours=1)):
"""
Pobiera dany obiekt z poprzednim sprawdzeniem cache. Nie używane domyślnie w bibliotece.
:param str uid: Id żądanego obiektu
:param cls: Klasa żądanego obiektu
:param timedelta max_lifetime: Maksymalny czas ważności cache dla tego obiektu
:return: Żądany obiekt
"""
requested_object = self.cache.get_object(uid, cls)
if requested_object is None:
logging.debug('Obejct is not present in cache!')
requested_object = cls.create(uid=uid, session=self)
self.cache.add_object(uid, cls, requested_object.export_resource())
return requested_object
try:
age = datetime.now() - requested_object.last_load
except TypeError:
age = datetime.now() - requested_object.last_load.replace(tzinfo=None)
if age > max_lifetime:
logging.debug('Object is too old! Trying to get latest object from api')
requested_object = cls.create(uid=uid, session=self)
self.cache.del_object(uid)
self.cache.add_object(uid, cls, requested_object.export_resource())
return requested_object
# API query part
def return_objects(self, *path, cls, extraction_key=None, lifetime=timedelta(seconds=10), bypass_cache=False):
"""
Zwraca listę obiektów lub obiekt, wygenerowaną z danych danej ścieżki API.
:param str path: Niezłożona ścieżka do węzła API
:param cls: Klasa żądanych obiektów
:param str extraction_key: Klucz do wyjęcia danych, pozostawienie tego parametru na None powoduje
automatyczną próbę zczytania danych
:param timedelta lifetime: Maksymalny czas ważności cache dla tego zapytania http
:param bool bypass_cache: Ustawienie tego parametru na True powoduje ignorowanie mechanizmu cache
:return: Lista żądanych obiektów
:rtype: list[SynergiaGenericClass]
"""
if bypass_cache:
raw = self.get(*path)
else:
raw = self.get_cached_response(*path, max_lifetime=lifetime)
if extraction_key is None:
extraction_key = SynergiaGenericClass.auto_extract(raw)
raw = raw[extraction_key]
if isinstance(raw, list):
stack = []
for stored_payload in raw:
stack.append(cls.assembly(stored_payload, self))
return tuple(stack)
if isinstance(raw, dict):
return cls.assembly(raw, self)
return None
def grades(self, *grades):
"""
:param int grades: Id ocen
:rtype: tuple[librus_tricks.classes.SynergiaGrade]
:return: krotka z wszystkimi/wybranymi ocenami
"""
if grades.__len__() == 0:
return self.return_objects('Grades', cls=SynergiaGrade, extraction_key='Grades')
ids_computed = self.assembly_path(*grades, sep=',', suffix=',')[1:]
return self.return_objects('Grades', ids_computed, cls=SynergiaGrade, extraction_key='Grades')
@property
def grades_categorized(self):
grades_categorized = {}
for subject in self.subjects():
grades_categorized[subject.name] = []
for grade in self.grades():
grades_categorized[grade.subject.name].append(
grade
)
for subjects in grades_categorized.copy().keys():
if grades_categorized[subjects].__len__() == 0:
del (grades_categorized[subjects])
return grades_categorized
def attendances(self, *attendances):
"""
:param int attendances: Id obecności
:rtype: tuple[librus_tricks.classes.SynergiaAttendance]
:return: krotka z wszystkimi/wybranymi obecnościami
"""
if attendances.__len__() == 0:
return self.return_objects('Attendances', cls=SynergiaAttendance, extraction_key='Attendances')
ids_computed = self.assembly_path(*attendances, sep=',', suffix=',')[1:]
return self.return_objects('Attendances', ids_computed, cls=SynergiaAttendance, extraction_key='Attendances')
@property
def illegal_absences(self):
"""
:rtype: tuple[librus_tricks.classes.SynergiaAttendance]
:return: krotka z nieusprawiedliwionymi nieobecnościami
"""
def is_absence(k):
if k.type.uid == '1':
return True
return False
return tuple(filter(is_absence, self.attendances()))
@property
def all_absences(self):
"""
:rtype: tuple[librus_tricks.classes.SynergiaAttendance]
:return: krotka z wszystkimi nieobecnościami
"""
return tuple(filter(lambda k: not k.type.is_presence_kind, self.attendances()))
def exams(self, *exams):
"""
:param int exams: Id egzaminów
:rtype: tuple[librus_tricks.classes.SynergiaExam]
:return: krotka z wszystkimi egzaminami
"""
if exams.__len__() == 0:
return self.return_objects('HomeWorks', cls=SynergiaExam, extraction_key='HomeWorks')
ids_computed = self.assembly_path(*exams, sep=',', suffix=',')[1:]
return self.return_objects('HomeWorks', ids_computed, cls=SynergiaExam, extraction_key='HomeWorks')
def colors(self, *colors):
"""
:param int colors: Id kolorów
:rtype: tuple[librus_tricks.classes.SynergiaColors]
"""
if colors.__len__() == 0:
return self.return_objects('Colors', cls=SynergiaColor, extraction_key='Colors')
ids_computed = self.assembly_path(*colors, sep=',', suffix=',')[1:]
return self.return_objects('Colors', ids_computed, cls=SynergiaColor, extraction_key='Colors')
def timetable(self, for_date=datetime.now()):
"""
Plan lekcji na cały tydzień.
:param datetime.datetime for_date: Data dnia, który ma być w planie lekcji
:rtype: librus_tricks.classes.SynergiaTimetable
:return: obiekt tygodniowego planu lekcji
"""
monday = tools.get_actual_monday(for_date).isoformat()
matrix = self.get('Timetables', request_params={'weekStart': monday})
return SynergiaTimetable.assembly(matrix['Timetable'], self)
def timetable_day(self, for_date: datetime):
return self.timetable(for_date).days[for_date.date()]
@property
def today_timetable(self):
"""
Plan lekcji na dzisiejszy dzień.
:rtype: librus_tricks.classes.SynergiaTimetableDay
:return: Plan lekcji na dziś
"""
try:
return self.timetable().days[datetime.now().date()]
except KeyError:
return None
@property
def tomorrow_timetable(self):
"""
Plan lekcji na kolejny dzień.
:rtype: librus_tricks.classes.SynergiaTimetableDay
:return: Plan lekcji na jutro
"""
try:
return self.timetable(datetime.now() + timedelta(days=1)).days[(datetime.now() + timedelta(days=1)).date()]
except KeyError:
return None
def messages(self, *messages):
"""
Wymaga mobilnych dodatków.
:param int messages: Id wiadomości
:rtype: tuple[librus_tricks.classes.SynergiaNativeMessage]
"""
if messages.__len__() == 0:
return self.return_objects('Messages', cls=SynergiaNativeMessage, extraction_key='Messages')
ids_computed = self.assembly_path(*messages, sep=',', suffix=',')[1:]
return self.return_objects('Messages', ids_computed, cls=SynergiaNativeMessage, extraction_key='Messages')
def news_feed(self):
"""
:return: Wszystkie ogłoszenia szkolne
:rtype: tuple[librus_tricks.classes.SynergiaNews]
"""
return self.return_objects('SchoolNotices', cls=SynergiaNews, extraction_key='SchoolNotices')
def subjects(self, *subject):
"""
:return: Wszystkie/wybrane przedmioty lekcyjne
:param int subject: Id przedmiotów
:rtype: tuple[librus_tricks.classes.SynergiaSubject]
"""
if subject.__len__() == 0:
return self.return_objects('Subjects', cls=SynergiaSubject, extraction_key='Subjects')
ids_computed = self.assembly_path(*subject, sep=',', suffix=',')[1:]
return self.return_objects('Subjects', ids_computed, cls=SynergiaSubject, extraction_key='Subjects')
@property
def school(self):
"""
:return: Obiekt z informacjami o twojej szkole
:rtype: librus_tricks.classes.SynergiaSchool
"""
return self.return_objects('Schools', cls=SynergiaSchool, extraction_key='School')
@property
def lucky_number(self):
"""
:return: Szczęśliwy numerek
:rtype: int
"""
return self.get('LuckyNumbers')['LuckyNumber']['LuckyNumber']
@staticmethod
def __is_future(day):
"""
:type day: librus_tricks.classes.SynergiaTeacherFreeDays
:return:
"""
if day.ends >= datetime.now().date():
return True
return False
def teacher_free_days(self, *days_ids, only_future=True):
"""
Zwraca dane przedmioty.
:param int days_ids: Id zwolnień
:rtype: tuple[librus_tricks.classes.SynergiaTeacherFreeDays]
"""
if days_ids.__len__() == 0:
days = self.return_objects('Calendars', 'TeacherFreeDays', cls=SynergiaTeacherFreeDays)
else:
ids_computed = self.assembly_path(*days_ids, sep=',', suffix=',')[1:]
days = self.return_objects('Calendars', 'TeacherFreeDays', ids_computed, cls=SynergiaTeacherFreeDays)
days = tuple(sorted(days, key=lambda x: x.starts))
if only_future:
return tuple(filter(self.__is_future, days))
return days
def school_free_days(self, *days_ids, only_future=True):
if days_ids.__len__() == 0:
days = self.return_objects('Calendars', 'SchoolFreeDays', cls=SynergiaSchoolFreeDays)
else:
ids_computed = self.assembly_path(*days_ids, sep=',', suffix=',')[1:]
days = self.return_objects('Calendars', 'SchoolFreeDays', ids_computed, cls=SynergiaSchoolFreeDays)
days = tuple(sorted(days, key=lambda x: x.starts))
if only_future:
return tuple(filter(self.__is_future, days))
return days
def realizations(self, *realizations_ids):
if realizations_ids.__len__() == 0:
return self.return_objects('Realizations', cls=SynergiaRealization, extraction_key='Realizations')
ids_computed = self.assembly_path(*realizations_ids, sep=',', suffix=',')[1:]
return self.return_objects('Realizations', ids_computed, cls=SynergiaRealization, extraction_key='Realizations')
def substitutions(self):
pass
def preload_cache(self):
self.cache.clear_objects()
for thing in (*self.attendances(), *self.grades(), *self.subjects(), *self.school_free_days(only_future=False),
*self.teacher_free_days(only_future=False)):
self.cache.add_object(thing.uid, thing.__class__, thing.export_resource())
logging.info('Loaded %s objects into cache', self.cache.count_object())

82
exceptions.py Normal file
View file

@ -0,0 +1,82 @@
class LibrusTricksException(Exception):
pass
class LibrusTricksAuthException(LibrusTricksException):
pass
class LibrusTricksWrapperException(LibrusTricksAuthException):
pass
class LibrusLoginError(LibrusTricksAuthException):
pass
class SynergiaAPIEndpointNotFound(LibrusTricksWrapperException):
pass
class LibrusPortalInvalidPasswordError(LibrusTricksAuthException):
pass
class SynergiaAccessDenied(LibrusTricksWrapperException):
pass
class WrongHTTPMethod(Exception):
pass
class SynergiaAPIInvalidRequest(LibrusTricksWrapperException):
pass
class TokenExpired(LibrusTricksException):
pass
class SynergiaForbidden(LibrusTricksWrapperException):
pass
class InvalidCacheManager(LibrusTricksAuthException):
pass
class CaptchaRequired(LibrusTricksAuthException):
pass
class SynergiaServerError(LibrusTricksWrapperException):
pass
class SessionRequired(LibrusTricksWrapperException):
pass
class APIPathIsEmpty(LibrusTricksWrapperException):
pass
class OtherHTTPResponse(LibrusTricksWrapperException):
pass
class SecurityWarning(Warning):
pass
class PerformanceWarning(Warning):
pass
class GoodPracticeWarning(Warning):
pass
class SynergiaMaintenanceError(SynergiaServerError):
pass

82
messages.py Normal file
View file

@ -0,0 +1,82 @@
from datetime import datetime
import logging
import requests
from bs4 import BeautifulSoup
class SynergiaScrappedMessage:
def __init__(self, url, parent_web_session, header, author, message_date, synergia_session):
"""
:type url: str
:type parent_web_session: requests.sessions.Session
:type message_date: datetime
:type synergia_session: librus_tricks.core.SynergiaClient
"""
self.web_session = parent_web_session
self.url = url
self.header = header
self.author_alias = author
self.msg_date = message_date
self.synergia_session = synergia_session
def __read_from_server(self):
response = self.web_session.get('https://synergia.librus.pl' + self.url)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.find('div', attrs={'class': 'container-message-content'}).text
@property
def text(self):
return self.__read_from_server()
@property
def author(self):
from librus_tricks.classes import SynergiaTeacher
teachers = self.synergia_session.return_objects('Users', cls=SynergiaTeacher, extraction_key='Users')
for teacher in teachers:
if str(teacher.name) in self.author_alias and str(teacher.last_name) in self.author_alias:
return teacher
return
def __repr__(self):
return f'<Message from {self.author_alias} into {self.url}>'
class MessageReader:
def __init__(self, session):
"""
:param librus_tricks.core.SynergiaClient session:
"""
self._syn_session = session
self._web_session = requests.session()
logging.debug('Obtain AutoLoginToken from server')
token = session.post('AutoLoginToken')['Token']
self._web_session.get(f'https://synergia.librus.pl/loguj/token/{token}/przenies/wiadomosci')
logging.debug('Webscrapper logged into Synergia')
def read_messages(self):
response = self._web_session.get('https://synergia.librus.pl/wiadomosci')
soup = BeautifulSoup(response.text, 'html.parser')
table = soup.find('table', attrs={'class': 'decorated stretch'})
tbody = table.find('tbody')
if 'Brak wiadomości' in tbody.text:
return None
rows = tbody.find_all('tr')
messages = []
for message in rows:
cols = message.find_all('td')
messages.append(SynergiaScrappedMessage(
url=cols[3].a['href'],
header=cols[3].text.strip(),
author=cols[2].text.strip(),
parent_web_session=self._web_session,
message_date=datetime.strptime(cols[4].text, '%Y-%m-%d %H:%M:%S'),
synergia_session=self._syn_session
))
return messages
def __repr__(self):
return f'<{self.__class__.__name__} for {self._syn_session.user}>'

185
tools.py Normal file
View file

@ -0,0 +1,185 @@
from datetime import datetime, timedelta
import re
def get_next_monday(now=datetime.now()):
for _ in range(8):
if now.weekday() == 0:
return now.date()
now = now + timedelta(days=1)
return
def get_actual_monday(now=datetime.now()):
for _ in range(8):
if now.weekday() == 0:
return now.date()
now = now - timedelta(days=1)
return
def extract_percentage(grade):
"""
:param librus_tricks.classes.SynergiaGrade grade:
:return:
"""
for comment in grade.comments:
matches = re.findall(r'(\d+)%', comment.text)
if matches.__len__() > 0:
return float(matches[0])
return
def weighted_average(*grades_and_weights):
values = 0
count = 0
for grade_weight in grades_and_weights:
count += grade_weight[1]
for _ in range(grade_weight[1]):
values += grade_weight[0]
if count == 0:
return 0
return values / count
def extracted_percentages(grades):
grades = [grade for grade in grades if extract_percentage(grade) is not None]
subjects = set([grade.subject.name for grade in grades])
categorized = {}
for subject in subjects:
categorized[subject] = []
for grade in grades:
categorized[grade.subject.name].append((grade, extract_percentage(grade)))
return categorized
def no_cache(func):
def wrapper(*args, **kwargs):
return func(*args, **{**kwargs, 'expire': 0})
return wrapper
def percentage_average(grades, generic_top_value=5):
def compare_lists(list_a, list_b):
result = []
for item in list_a:
if item not in list_b:
result.append(item)
return result
percentages = extracted_percentages(grades)
if percentages.keys().__len__() == 0:
return {}
generics = compare_lists(grades, [grade_weight_tuple[0] for grade_weight_tuple in tuple(percentages.values())[0]])
averages = {}
for subject_name in percentages:
averages[subject_name] = weighted_average(
*[(grade_percent[1], grade_percent[0].category.weight) for grade_percent in percentages[subject_name]],
*[((generic.real_value / generic_top_value) * 100, generic.category.weight) for generic in generics if
generic_top_value is not None or not False if generic.real_value is not None if
generic.subject.name == subject_name]
)
return averages
def subjects_averages(subject_keyed_grades):
averages = {}
for subject in subject_keyed_grades:
averages[subject] = weighted_average(
*[(grade.real_value, grade.category.weight) for grade in subject_keyed_grades[subject] if
grade.real_value is not None]
)
return averages
def count_attendances(attendances):
"""
:param iterable[librus_tricks.classes.SynergiaAttendance] attendances:
:return:
"""
categories = set()
for attendance in attendances:
categories.add(attendance.type)
results = {}
for cat in categories:
results[cat] = 0
for attendance in attendances:
results[attendance.type] += 1
return results
def present_percentage(attendances):
"""
:param list[librus_tricks.classes.SynergiaAttendance] attendances:
:return:
"""
present = 0
absent = 0
for attendance in attendances:
if attendance.type.is_presence_kind:
present += 1
else:
absent += 1
return present / attendances.__len__() * 100
def percentages_of_attendances(attendances):
"""
:param list[librus_tricks.classes.SynergiaAttendance] attendances:
:return:
"""
results = count_attendances(attendances)
for category in results:
results[category] = results[category] / attendances.__len__() * 100
return results
def attendance_per_subject(attendances):
"""
:type attendances: list of librus_tricks.classes.SynergiaAttendance
"""
subjects = set()
att_types = set()
for att in attendances:
subjects.add(
att.lesson.subject
)
att_types.add(
att.type
)
attendances_by_subject = dict()
for sub in subjects:
attendances_by_subject[sub] = dict()
for attyp in att_types:
attendances_by_subject[sub][attyp] = list()
for att in attendances:
attendances_by_subject[att.lesson.subject][att.type].append(
att
)
redundant = []
for subject in attendances_by_subject:
for at_type in attendances_by_subject[subject]:
if attendances_by_subject[subject][at_type].__len__() == 0:
redundant.append((subject, at_type))
for n in redundant:
del(attendances_by_subject[n[0]][n[1]])
return attendances_by_subject