import logging from datetime import datetime, timedelta import requests from bs4 import BeautifulSoup from .exceptions import * # Some globals REDIRURL = 'http://localhost/bar' LOGINURL = 'https://portal.librus.pl/rodzina/login/action' OAUTHURL = 'https://portal.librus.pl/oauth2/access_token' SYNERGIAAUTHURL = 'https://portal.librus.pl/api/v2/SynergiaAccounts' FRESHURL = 'https://portal.librus.pl/api/v2/SynergiaAccounts/fresh/{login}' CLIENTID = '6XPsKf10LPz1nxgHQLcvZ1KM48DYzlBAhxipaXY8' LIBRUSLOGINURL = f'https://portal.librus.pl/oauth2/authorize?client_id={CLIENTID}&redirect_uri={REDIRURL}&response_type=code' # User agents XIAOMI_USERAGENT = 'Mozilla/5.0 (Linux; Android 9; Mi A1 Build/PQ3B.190801.002; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3921.2 Mobile Safari/537.36LibrusMobileApp' IPHONE_USERAGENT = 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/77.0.3865.103 Mobile/15E148 Safari/605.1LibrusMobileApp' GOOGLEBOT_USERAGENT = 'Googlebot/2.1 (+http://www.google.com/bot.html)LibrusMobileApp' class SynergiaUser: """ Obiekt zawierający dane do tworzenia sesji """ def __init__(self, user_dict, root_token, revalidation_token, exp_in): self.token = user_dict['accessToken'] self.refresh_token = revalidation_token self.root_token = root_token self.name, self.last_name = user_dict['studentName'].split(' ', maxsplit=1) self.login = user_dict['login'] self.uid = user_dict['id'] self.expires_in = datetime.now() + timedelta(seconds=exp_in) def __repr__(self): return f'' def __str__(self): return f'{self.name} {self.last_name}' def revalidate_root(self): """ Aktualizuje token do Portalu Librus. """ auth_session = requests.session() new_tokens = auth_session.post( OAUTHURL, data={ 'grant_type': 'refresh_token', 'refresh_token': self.refresh_token, 'client_id': CLIENTID } ) logging.debug('%s response %s', new_tokens.status_code, new_tokens.json()) try: self.root_token = new_tokens.json()['access_token'] self.refresh_token = new_tokens.json()['refresh_token'] except KeyError: raise LibrusTricksAuthException('Invalid payload recived', new_tokens.json()) def revalidate_user(self): """ Aktualizuje token dostępu do Synergii, który wygasa po 24h. """ def do_revalidation(): auth_session = requests.session() new_token = auth_session.get( FRESHURL.format(login=self.login), headers={'Authorization': f'Bearer {self.root_token}'} ) logging.debug('%s response %s', new_token.status_code, new_token.json()) return new_token new_token = do_revalidation() if new_token.json().get('error') == 'access_denied': logging.info('Obtaing new token failed! Refreshing root token') self.revalidate_root() new_token = do_revalidation() # again... try: self.token = new_token.json()['accessToken'] except KeyError: raise LibrusTricksAuthException('Invalid response received', new_token.json()) def check_is_expired(self, use_clock=True, use_query=True): """ :param bool use_clock: Sprawdza na podstawie czasu :param bool use_query: Sprawdza poprzez zapytanie http GET na ``/Me`` :return: krotka z wynikami :rtype: tuple[bool] """ clock_resp = None query_resp = None if use_clock: if datetime.now() > self.expires_in: clock_resp = False else: clock_resp = True if use_query: test = requests.get('https://api.librus.pl/2.0/Me', headers={'Authorization': f'Bearer {self.token}'}) if test.status_code == 401: query_resp = False else: query_resp = True return clock_resp, query_resp @property def is_valid(self): """ Umożliwia sprawdzenie czy konto ma jeszcze aktualny token. :return: ``False`` - trzeba wyrobić nowy token :rtype: bool """ return self.check_is_expired(use_clock=False)[1] def dump_credentials(self, cred_file=None): import json if cred_file is None: cred_file = open(f'{self.login}.json', 'w') json.dump({ 'user_dict': { 'accessToken': self.token, 'studentName': f'{self.name} {self.last_name}', 'id': self.uid, 'login': self.login, }, 'root_token': self.root_token, 'revalidation_token': self.refresh_token, 'exp_in': int(self.expires_in.timestamp()) }, cred_file) def dict_credentials(self): return { 'user_dict': { 'accessToken': self.token, 'studentName': f'{self.name} {self.last_name}', 'id': self.uid, 'login': self.login, }, 'root_token': self.root_token, 'revalidation_token': self.refresh_token, 'exp_in': int(self.expires_in.timestamp()) } def load_json(cred_file): import json return SynergiaUser(**json.load(cred_file)) def authorizer(email, password, user_agent=None): """ Zwraca listę użytkowników dostępnych dla danego konta Librus Portal :param str email: Email do Portalu Librus :param str password: Hasło do Portalu Librus :return: Listę z użytkownikami połączonymi do konta Librus Synergia :rtype: list[librus_tricks.auth.SynergiaUser] """ if user_agent is None: from random import choice user_agent = choice([XIAOMI_USERAGENT, IPHONE_USERAGENT]) logging.debug('No user-agent specified, using %s', user_agent) auth_session = requests.session() auth_session.headers.update({'User-Agent': user_agent, 'X-Requested-With': 'pl.librus.synergiaDru2'}) site = auth_session.get(LIBRUSLOGINURL) soup = BeautifulSoup(site.text, 'html.parser') csrf = soup.find('meta', attrs={'name': 'csrf-token'})['content'] login_response_redirection = auth_session.post( LOGINURL, json={'email': email, 'password': password}, headers={'X-CSRF-TOKEN': csrf, 'Content-Type': 'application/json'} ) if login_response_redirection.status_code != 200: if login_response_redirection.status_code == 403: if 'g-recaptcha-response' in login_response_redirection.json()['errors']: raise CaptchaRequired(login_response_redirection.json()) raise LibrusPortalInvalidPasswordError(login_response_redirection.json()) raise LibrusLoginError(login_response_redirection.text) redirection_addr = login_response_redirection.json()['redirect'] redirection_response = auth_session.get(redirection_addr, allow_redirects=False) oauth_code = redirection_response.headers['location'].replace('http://localhost/bar?code=', '') synergia_root_response = auth_session.post( OAUTHURL, data={ 'grant_type': 'authorization_code', 'code': oauth_code, 'client_id': CLIENTID, 'redirect_uri': REDIRURL } ) synergia_root_login_token = synergia_root_response.json()['access_token'] synergia_root_revalidation_token = synergia_root_response.json()['refresh_token'] synergia_root_expiration = synergia_root_response.json()['expires_in'] synergia_users_response = auth_session.get(SYNERGIAAUTHURL, headers={'Authorization': f'Bearer {synergia_root_login_token}'}) synergia_users_raw = synergia_users_response.json()['accounts'] synergia_users = [ SynergiaUser(user_data, synergia_root_login_token, synergia_root_revalidation_token, synergia_root_expiration) for user_data in synergia_users_raw] return synergia_users