1309 lines
52 KiB
Python
1309 lines
52 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
__all__ = [
|
||
|
"SpotifyClientCredentials",
|
||
|
"SpotifyOAuth",
|
||
|
"SpotifyOauthError",
|
||
|
"SpotifyStateError",
|
||
|
"SpotifyImplicitGrant",
|
||
|
"SpotifyPKCE"
|
||
|
]
|
||
|
|
||
|
import base64
|
||
|
import logging
|
||
|
import os
|
||
|
import time
|
||
|
import warnings
|
||
|
import webbrowser
|
||
|
|
||
|
import requests
|
||
|
# Workaround to support both python 2 & 3
|
||
|
import six
|
||
|
import six.moves.urllib.parse as urllibparse
|
||
|
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
|
||
|
from six.moves.urllib_parse import parse_qsl, urlparse
|
||
|
|
||
|
from spotipy.cache_handler import CacheFileHandler, CacheHandler
|
||
|
from spotipy.util import CLIENT_CREDS_ENV_VARS, get_host_port, normalize_scope
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class SpotifyOauthError(Exception):
|
||
|
""" Error during Auth Code or Implicit Grant flow """
|
||
|
|
||
|
def __init__(self, message, error=None, error_description=None, *args, **kwargs):
|
||
|
self.error = error
|
||
|
self.error_description = error_description
|
||
|
self.__dict__.update(kwargs)
|
||
|
super(SpotifyOauthError, self).__init__(message, *args, **kwargs)
|
||
|
|
||
|
|
||
|
class SpotifyStateError(SpotifyOauthError):
|
||
|
""" The state sent and state received were different """
|
||
|
|
||
|
def __init__(self, local_state=None, remote_state=None, message=None,
|
||
|
error=None, error_description=None, *args, **kwargs):
|
||
|
if not message:
|
||
|
message = ("Expected " + local_state + " but recieved "
|
||
|
+ remote_state)
|
||
|
super(SpotifyOauthError, self).__init__(message, error,
|
||
|
error_description, *args,
|
||
|
**kwargs)
|
||
|
|
||
|
|
||
|
def _make_authorization_headers(client_id, client_secret):
|
||
|
auth_header = base64.b64encode(
|
||
|
six.text_type(client_id + ":" + client_secret).encode("ascii")
|
||
|
)
|
||
|
return {"Authorization": "Basic %s" % auth_header.decode("ascii")}
|
||
|
|
||
|
|
||
|
def _ensure_value(value, env_key):
|
||
|
env_val = CLIENT_CREDS_ENV_VARS[env_key]
|
||
|
_val = value or os.getenv(env_val)
|
||
|
if _val is None:
|
||
|
msg = "No %s. Pass it or set a %s environment variable." % (
|
||
|
env_key,
|
||
|
env_val,
|
||
|
)
|
||
|
raise SpotifyOauthError(msg)
|
||
|
return _val
|
||
|
|
||
|
|
||
|
class SpotifyAuthBase(object):
|
||
|
def __init__(self, requests_session):
|
||
|
if isinstance(requests_session, requests.Session):
|
||
|
self._session = requests_session
|
||
|
else:
|
||
|
if requests_session: # Build a new session.
|
||
|
self._session = requests.Session()
|
||
|
else: # Use the Requests API module as a "session".
|
||
|
from requests import api
|
||
|
self._session = api
|
||
|
|
||
|
def _normalize_scope(self, scope):
|
||
|
return normalize_scope(scope)
|
||
|
|
||
|
@property
|
||
|
def client_id(self):
|
||
|
return self._client_id
|
||
|
|
||
|
@client_id.setter
|
||
|
def client_id(self, val):
|
||
|
self._client_id = _ensure_value(val, "client_id")
|
||
|
|
||
|
@property
|
||
|
def client_secret(self):
|
||
|
return self._client_secret
|
||
|
|
||
|
@client_secret.setter
|
||
|
def client_secret(self, val):
|
||
|
self._client_secret = _ensure_value(val, "client_secret")
|
||
|
|
||
|
@property
|
||
|
def redirect_uri(self):
|
||
|
return self._redirect_uri
|
||
|
|
||
|
@redirect_uri.setter
|
||
|
def redirect_uri(self, val):
|
||
|
self._redirect_uri = _ensure_value(val, "redirect_uri")
|
||
|
|
||
|
@staticmethod
|
||
|
def _get_user_input(prompt):
|
||
|
try:
|
||
|
return raw_input(prompt)
|
||
|
except NameError:
|
||
|
return input(prompt)
|
||
|
|
||
|
@staticmethod
|
||
|
def is_token_expired(token_info):
|
||
|
now = int(time.time())
|
||
|
return token_info["expires_at"] - now < 60
|
||
|
|
||
|
@staticmethod
|
||
|
def _is_scope_subset(needle_scope, haystack_scope):
|
||
|
needle_scope = set(needle_scope.split()) if needle_scope else set()
|
||
|
haystack_scope = (
|
||
|
set(haystack_scope.split()) if haystack_scope else set()
|
||
|
)
|
||
|
return needle_scope <= haystack_scope
|
||
|
|
||
|
def _handle_oauth_error(self, http_error):
|
||
|
response = http_error.response
|
||
|
try:
|
||
|
error_payload = response.json()
|
||
|
error = error_payload.get('error')
|
||
|
error_description = error_payload.get('error_description')
|
||
|
except ValueError:
|
||
|
# if the response cannot be decoded into JSON (which raises a ValueError),
|
||
|
# then try to decode it into text
|
||
|
|
||
|
# if we receive an empty string (which is falsy), then replace it with `None`
|
||
|
error = response.text or None
|
||
|
error_description = None
|
||
|
|
||
|
raise SpotifyOauthError(
|
||
|
'error: {0}, error_description: {1}'.format(
|
||
|
error, error_description
|
||
|
),
|
||
|
error=error,
|
||
|
error_description=error_description
|
||
|
)
|
||
|
|
||
|
def __del__(self):
|
||
|
"""Make sure the connection (pool) gets closed"""
|
||
|
if isinstance(self._session, requests.Session):
|
||
|
self._session.close()
|
||
|
|
||
|
|
||
|
class SpotifyClientCredentials(SpotifyAuthBase):
|
||
|
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
client_id=None,
|
||
|
client_secret=None,
|
||
|
proxies=None,
|
||
|
requests_session=True,
|
||
|
requests_timeout=None,
|
||
|
cache_handler=None
|
||
|
):
|
||
|
"""
|
||
|
Creates a Client Credentials Flow Manager.
|
||
|
|
||
|
The Client Credentials flow is used in server-to-server authentication.
|
||
|
Only endpoints that do not access user information can be accessed.
|
||
|
This means that endpoints that require authorization scopes cannot be accessed.
|
||
|
The advantage, however, of this authorization flow is that it does not require any
|
||
|
user interaction
|
||
|
|
||
|
You can either provide a client_id and client_secret to the
|
||
|
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
|
||
|
environment variables
|
||
|
|
||
|
Parameters:
|
||
|
* client_id: Must be supplied or set as environment variable
|
||
|
* client_secret: Must be supplied or set as environment variable
|
||
|
* proxies: Optional, proxy for the requests library to route through
|
||
|
* requests_session: A Requests session
|
||
|
* requests_timeout: Optional, tell Requests to stop waiting for a response after
|
||
|
a given number of seconds
|
||
|
* cache_handler: An instance of the `CacheHandler` class to handle
|
||
|
getting and saving cached authorization tokens.
|
||
|
Optional, will otherwise use `CacheFileHandler`.
|
||
|
(takes precedence over `cache_path` and `username`)
|
||
|
|
||
|
"""
|
||
|
|
||
|
super(SpotifyClientCredentials, self).__init__(requests_session)
|
||
|
|
||
|
self.client_id = client_id
|
||
|
self.client_secret = client_secret
|
||
|
self.proxies = proxies
|
||
|
self.requests_timeout = requests_timeout
|
||
|
if cache_handler:
|
||
|
assert issubclass(cache_handler.__class__, CacheHandler), \
|
||
|
"cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
|
||
|
+ " != " + str(CacheHandler)
|
||
|
self.cache_handler = cache_handler
|
||
|
else:
|
||
|
self.cache_handler = CacheFileHandler()
|
||
|
|
||
|
def get_access_token(self, as_dict=True, check_cache=True):
|
||
|
"""
|
||
|
If a valid access token is in memory, returns it
|
||
|
Else fetches a new token and returns it
|
||
|
|
||
|
Parameters:
|
||
|
- as_dict - a boolean indicating if returning the access token
|
||
|
as a token_info dictionary, otherwise it will be returned
|
||
|
as a string.
|
||
|
"""
|
||
|
if as_dict:
|
||
|
warnings.warn(
|
||
|
"You're using 'as_dict = True'."
|
||
|
"get_access_token will return the token string directly in future "
|
||
|
"versions. Please adjust your code accordingly, or use "
|
||
|
"get_cached_token instead.",
|
||
|
DeprecationWarning,
|
||
|
stacklevel=2,
|
||
|
)
|
||
|
|
||
|
if check_cache:
|
||
|
token_info = self.cache_handler.get_cached_token()
|
||
|
if token_info and not self.is_token_expired(token_info):
|
||
|
return token_info if as_dict else token_info["access_token"]
|
||
|
|
||
|
token_info = self._request_access_token()
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return token_info if as_dict else token_info["access_token"]
|
||
|
|
||
|
def _request_access_token(self):
|
||
|
"""Gets client credentials access token """
|
||
|
payload = {"grant_type": "client_credentials"}
|
||
|
|
||
|
headers = _make_authorization_headers(
|
||
|
self.client_id, self.client_secret
|
||
|
)
|
||
|
|
||
|
logger.debug(
|
||
|
"sending POST request to %s with Headers: %s and Body: %r",
|
||
|
self.OAUTH_TOKEN_URL, headers, payload
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
response = self._session.post(
|
||
|
self.OAUTH_TOKEN_URL,
|
||
|
data=payload,
|
||
|
headers=headers,
|
||
|
verify=True,
|
||
|
proxies=self.proxies,
|
||
|
timeout=self.requests_timeout,
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
token_info = response.json()
|
||
|
return token_info
|
||
|
except requests.exceptions.HTTPError as http_error:
|
||
|
self._handle_oauth_error(http_error)
|
||
|
|
||
|
def _add_custom_values_to_token_info(self, token_info):
|
||
|
"""
|
||
|
Store some values that aren't directly provided by a Web API
|
||
|
response.
|
||
|
"""
|
||
|
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
|
||
|
return token_info
|
||
|
|
||
|
|
||
|
class SpotifyOAuth(SpotifyAuthBase):
|
||
|
"""
|
||
|
Implements Authorization Code Flow for Spotify's OAuth implementation.
|
||
|
"""
|
||
|
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
|
||
|
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
client_id=None,
|
||
|
client_secret=None,
|
||
|
redirect_uri=None,
|
||
|
state=None,
|
||
|
scope=None,
|
||
|
cache_path=None,
|
||
|
username=None,
|
||
|
proxies=None,
|
||
|
show_dialog=False,
|
||
|
requests_session=True,
|
||
|
requests_timeout=None,
|
||
|
open_browser=True,
|
||
|
cache_handler=None
|
||
|
):
|
||
|
"""
|
||
|
Creates a SpotifyOAuth object
|
||
|
|
||
|
Parameters:
|
||
|
* client_id: Must be supplied or set as environment variable
|
||
|
* client_secret: Must be supplied or set as environment variable
|
||
|
* redirect_uri: Must be supplied or set as environment variable
|
||
|
* state: Optional, no verification is performed
|
||
|
* scope: Optional, either a list of scopes or comma separated string of scopes.
|
||
|
e.g, "playlist-read-private,playlist-read-collaborative"
|
||
|
* cache_path: (deprecated) Optional, will otherwise be generated
|
||
|
(takes precedence over `username`)
|
||
|
* username: (deprecated) Optional or set as environment variable
|
||
|
(will set `cache_path` to `.cache-{username}`)
|
||
|
* proxies: Optional, proxy for the requests library to route through
|
||
|
* show_dialog: Optional, interpreted as boolean
|
||
|
* requests_session: A Requests session
|
||
|
* requests_timeout: Optional, tell Requests to stop waiting for a response after
|
||
|
a given number of seconds
|
||
|
* open_browser: Optional, whether or not the web browser should be opened to
|
||
|
authorize a user
|
||
|
* cache_handler: An instance of the `CacheHandler` class to handle
|
||
|
getting and saving cached authorization tokens.
|
||
|
Optional, will otherwise use `CacheFileHandler`.
|
||
|
(takes precedence over `cache_path` and `username`)
|
||
|
"""
|
||
|
|
||
|
super(SpotifyOAuth, self).__init__(requests_session)
|
||
|
|
||
|
self.client_id = client_id
|
||
|
self.client_secret = client_secret
|
||
|
self.redirect_uri = redirect_uri
|
||
|
self.state = state
|
||
|
self.scope = self._normalize_scope(scope)
|
||
|
if username or cache_path:
|
||
|
warnings.warn("Specifying cache_path or username as arguments to SpotifyOAuth " +
|
||
|
"will be deprecated. Instead, please create a CacheFileHandler " +
|
||
|
"instance with the desired cache_path and username and pass it " +
|
||
|
"to SpotifyOAuth as the cache_handler. For example:\n\n" +
|
||
|
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
|
||
|
"\thandler = CacheFileHandler(cache_path=cache_path, " +
|
||
|
"username=username)\n" +
|
||
|
"\tsp = spotipy.SpotifyOAuth(client_id, client_secret, " +
|
||
|
"redirect_uri," +
|
||
|
" cache_handler=handler)",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
if cache_handler:
|
||
|
warnings.warn("A cache_handler has been specified along with a cache_path or " +
|
||
|
"username. The cache_path and username arguments will be ignored.")
|
||
|
if cache_handler:
|
||
|
assert issubclass(cache_handler.__class__, CacheHandler), \
|
||
|
"cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
|
||
|
+ " != " + str(CacheHandler)
|
||
|
self.cache_handler = cache_handler
|
||
|
else:
|
||
|
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
|
||
|
self.cache_handler = CacheFileHandler(
|
||
|
username=username,
|
||
|
cache_path=cache_path
|
||
|
)
|
||
|
self.proxies = proxies
|
||
|
self.requests_timeout = requests_timeout
|
||
|
self.show_dialog = show_dialog
|
||
|
self.open_browser = open_browser
|
||
|
|
||
|
def validate_token(self, token_info):
|
||
|
if token_info is None:
|
||
|
return None
|
||
|
|
||
|
# if scopes don't match, then bail
|
||
|
if "scope" not in token_info or not self._is_scope_subset(
|
||
|
self.scope, token_info["scope"]
|
||
|
):
|
||
|
return None
|
||
|
|
||
|
if self.is_token_expired(token_info):
|
||
|
token_info = self.refresh_access_token(
|
||
|
token_info["refresh_token"]
|
||
|
)
|
||
|
|
||
|
return token_info
|
||
|
|
||
|
def get_authorize_url(self, state=None):
|
||
|
""" Gets the URL to use to authorize this app
|
||
|
"""
|
||
|
payload = {
|
||
|
"client_id": self.client_id,
|
||
|
"response_type": "code",
|
||
|
"redirect_uri": self.redirect_uri,
|
||
|
}
|
||
|
if self.scope:
|
||
|
payload["scope"] = self.scope
|
||
|
if state is None:
|
||
|
state = self.state
|
||
|
if state is not None:
|
||
|
payload["state"] = state
|
||
|
if self.show_dialog:
|
||
|
payload["show_dialog"] = True
|
||
|
|
||
|
urlparams = urllibparse.urlencode(payload)
|
||
|
|
||
|
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
|
||
|
|
||
|
def parse_response_code(self, url):
|
||
|
""" Parse the response code in the given response url
|
||
|
|
||
|
Parameters:
|
||
|
- url - the response url
|
||
|
"""
|
||
|
_, code = self.parse_auth_response_url(url)
|
||
|
if code is None:
|
||
|
return url
|
||
|
else:
|
||
|
return code
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_auth_response_url(url):
|
||
|
query_s = urlparse(url).query
|
||
|
form = dict(parse_qsl(query_s))
|
||
|
if "error" in form:
|
||
|
raise SpotifyOauthError("Received error from auth server: "
|
||
|
"{}".format(form["error"]),
|
||
|
error=form["error"])
|
||
|
return tuple(form.get(param) for param in ["state", "code"])
|
||
|
|
||
|
def _make_authorization_headers(self):
|
||
|
return _make_authorization_headers(self.client_id, self.client_secret)
|
||
|
|
||
|
def _open_auth_url(self):
|
||
|
auth_url = self.get_authorize_url()
|
||
|
try:
|
||
|
webbrowser.open(auth_url)
|
||
|
logger.info("Opened %s in your browser", auth_url)
|
||
|
except webbrowser.Error:
|
||
|
logger.error("Please navigate here: %s", auth_url)
|
||
|
|
||
|
def _get_auth_response_interactive(self, open_browser=False):
|
||
|
if open_browser:
|
||
|
self._open_auth_url()
|
||
|
prompt = "Enter the URL you were redirected to: "
|
||
|
else:
|
||
|
url = self.get_authorize_url()
|
||
|
prompt = (
|
||
|
"Go to the following URL: {}\n"
|
||
|
"Enter the URL you were redirected to: ".format(url)
|
||
|
)
|
||
|
response = self._get_user_input(prompt)
|
||
|
state, code = SpotifyOAuth.parse_auth_response_url(response)
|
||
|
if self.state is not None and self.state != state:
|
||
|
raise SpotifyStateError(self.state, state)
|
||
|
return code
|
||
|
|
||
|
def _get_auth_response_local_server(self, redirect_port):
|
||
|
server = start_local_http_server(redirect_port)
|
||
|
self._open_auth_url()
|
||
|
server.handle_request()
|
||
|
|
||
|
if server.error is not None:
|
||
|
raise server.error
|
||
|
elif self.state is not None and server.state != self.state:
|
||
|
raise SpotifyStateError(self.state, server.state)
|
||
|
elif server.auth_code is not None:
|
||
|
return server.auth_code
|
||
|
else:
|
||
|
raise SpotifyOauthError("Server listening on localhost has not been accessed")
|
||
|
|
||
|
def get_auth_response(self, open_browser=None):
|
||
|
logger.info('User authentication requires interaction with your '
|
||
|
'web browser. Once you enter your credentials and '
|
||
|
'give authorization, you will be redirected to '
|
||
|
'a url. Paste that url you were directed to to '
|
||
|
'complete the authorization.')
|
||
|
|
||
|
redirect_info = urlparse(self.redirect_uri)
|
||
|
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
|
||
|
|
||
|
if open_browser is None:
|
||
|
open_browser = self.open_browser
|
||
|
|
||
|
if (
|
||
|
open_browser
|
||
|
and redirect_host in ("127.0.0.1", "localhost")
|
||
|
and redirect_info.scheme == "http"
|
||
|
):
|
||
|
# Only start a local http server if a port is specified
|
||
|
if redirect_port:
|
||
|
return self._get_auth_response_local_server(redirect_port)
|
||
|
else:
|
||
|
logger.warning('Using `%s` as redirect URI without a port. '
|
||
|
'Specify a port (e.g. `%s:8080`) to allow '
|
||
|
'automatic retrieval of authentication code '
|
||
|
'instead of having to copy and paste '
|
||
|
'the URL your browser is redirected to.',
|
||
|
redirect_host, redirect_host)
|
||
|
|
||
|
return self._get_auth_response_interactive(open_browser=open_browser)
|
||
|
|
||
|
def get_authorization_code(self, response=None):
|
||
|
if response:
|
||
|
return self.parse_response_code(response)
|
||
|
return self.get_auth_response()
|
||
|
|
||
|
def get_access_token(self, code=None, as_dict=True, check_cache=True):
|
||
|
""" Gets the access token for the app given the code
|
||
|
|
||
|
Parameters:
|
||
|
- code - the response code
|
||
|
- as_dict - a boolean indicating if returning the access token
|
||
|
as a token_info dictionary, otherwise it will be returned
|
||
|
as a string.
|
||
|
"""
|
||
|
if as_dict:
|
||
|
warnings.warn(
|
||
|
"You're using 'as_dict = True'."
|
||
|
"get_access_token will return the token string directly in future "
|
||
|
"versions. Please adjust your code accordingly, or use "
|
||
|
"get_cached_token instead.",
|
||
|
DeprecationWarning,
|
||
|
stacklevel=2,
|
||
|
)
|
||
|
if check_cache:
|
||
|
token_info = self.validate_token(self.cache_handler.get_cached_token())
|
||
|
if token_info is not None:
|
||
|
if self.is_token_expired(token_info):
|
||
|
token_info = self.refresh_access_token(
|
||
|
token_info["refresh_token"]
|
||
|
)
|
||
|
return token_info if as_dict else token_info["access_token"]
|
||
|
|
||
|
payload = {
|
||
|
"redirect_uri": self.redirect_uri,
|
||
|
"code": code or self.get_auth_response(),
|
||
|
"grant_type": "authorization_code",
|
||
|
}
|
||
|
if self.scope:
|
||
|
payload["scope"] = self.scope
|
||
|
if self.state:
|
||
|
payload["state"] = self.state
|
||
|
|
||
|
headers = self._make_authorization_headers()
|
||
|
|
||
|
logger.debug(
|
||
|
"sending POST request to %s with Headers: %s and Body: %r",
|
||
|
self.OAUTH_TOKEN_URL, headers, payload
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
response = self._session.post(
|
||
|
self.OAUTH_TOKEN_URL,
|
||
|
data=payload,
|
||
|
headers=headers,
|
||
|
verify=True,
|
||
|
proxies=self.proxies,
|
||
|
timeout=self.requests_timeout,
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
token_info = response.json()
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return token_info if as_dict else token_info["access_token"]
|
||
|
except requests.exceptions.HTTPError as http_error:
|
||
|
self._handle_oauth_error(http_error)
|
||
|
|
||
|
def refresh_access_token(self, refresh_token):
|
||
|
payload = {
|
||
|
"refresh_token": refresh_token,
|
||
|
"grant_type": "refresh_token",
|
||
|
}
|
||
|
|
||
|
headers = self._make_authorization_headers()
|
||
|
|
||
|
logger.debug(
|
||
|
"sending POST request to %s with Headers: %s and Body: %r",
|
||
|
self.OAUTH_TOKEN_URL, headers, payload
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
response = self._session.post(
|
||
|
self.OAUTH_TOKEN_URL,
|
||
|
data=payload,
|
||
|
headers=headers,
|
||
|
proxies=self.proxies,
|
||
|
timeout=self.requests_timeout,
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
token_info = response.json()
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
if "refresh_token" not in token_info:
|
||
|
token_info["refresh_token"] = refresh_token
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return token_info
|
||
|
except requests.exceptions.HTTPError as http_error:
|
||
|
self._handle_oauth_error(http_error)
|
||
|
|
||
|
def _add_custom_values_to_token_info(self, token_info):
|
||
|
"""
|
||
|
Store some values that aren't directly provided by a Web API
|
||
|
response.
|
||
|
"""
|
||
|
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
|
||
|
token_info["scope"] = self.scope
|
||
|
return token_info
|
||
|
|
||
|
def get_cached_token(self):
|
||
|
warnings.warn("Calling get_cached_token directly on the SpotifyOAuth object will be " +
|
||
|
"deprecated. Instead, please specify a CacheFileHandler instance as " +
|
||
|
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
|
||
|
"get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
|
||
|
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
return self.validate_token(self.cache_handler.get_cached_token())
|
||
|
|
||
|
def _save_token_info(self, token_info):
|
||
|
warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
|
||
|
"deprecated. Instead, please specify a CacheFileHandler instance as " +
|
||
|
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
|
||
|
"save_token_to_cache method.",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return None
|
||
|
|
||
|
|
||
|
class SpotifyPKCE(SpotifyAuthBase):
|
||
|
""" Implements PKCE Authorization Flow for client apps
|
||
|
|
||
|
This auth manager enables *user and non-user* endpoints with only
|
||
|
a client ID, redirect URI, and username. When the app requests
|
||
|
an access token for the first time, the user is prompted to
|
||
|
authorize the new client app. After authorizing the app, the client
|
||
|
app is then given both access and refresh tokens. This is the
|
||
|
preferred way of authorizing a mobile/desktop client.
|
||
|
|
||
|
"""
|
||
|
|
||
|
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
|
||
|
OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||
|
|
||
|
def __init__(self,
|
||
|
client_id=None,
|
||
|
redirect_uri=None,
|
||
|
state=None,
|
||
|
scope=None,
|
||
|
cache_path=None,
|
||
|
username=None,
|
||
|
proxies=None,
|
||
|
requests_timeout=None,
|
||
|
requests_session=True,
|
||
|
open_browser=True,
|
||
|
cache_handler=None):
|
||
|
"""
|
||
|
Creates Auth Manager with the PKCE Auth flow.
|
||
|
|
||
|
Parameters:
|
||
|
* client_id: Must be supplied or set as environment variable
|
||
|
* redirect_uri: Must be supplied or set as environment variable
|
||
|
* state: Optional, no verification is performed
|
||
|
* scope: Optional, either a list of scopes or comma separated string of scopes.
|
||
|
e.g, "playlist-read-private,playlist-read-collaborative"
|
||
|
* cache_path: (deprecated) Optional, will otherwise be generated
|
||
|
(takes precedence over `username`)
|
||
|
* username: (deprecated) Optional or set as environment variable
|
||
|
(will set `cache_path` to `.cache-{username}`)
|
||
|
* proxies: Optional, proxy for the requests library to route through
|
||
|
* requests_timeout: Optional, tell Requests to stop waiting for a response after
|
||
|
a given number of seconds
|
||
|
* requests_session: A Requests session
|
||
|
* open_browser: Optional, whether or not the web browser should be opened to
|
||
|
authorize a user
|
||
|
* cache_handler: An instance of the `CacheHandler` class to handle
|
||
|
getting and saving cached authorization tokens.
|
||
|
Optional, will otherwise use `CacheFileHandler`.
|
||
|
(takes precedence over `cache_path` and `username`)
|
||
|
"""
|
||
|
|
||
|
super(SpotifyPKCE, self).__init__(requests_session)
|
||
|
self.client_id = client_id
|
||
|
self.redirect_uri = redirect_uri
|
||
|
self.state = state
|
||
|
self.scope = self._normalize_scope(scope)
|
||
|
if username or cache_path:
|
||
|
warnings.warn("Specifying cache_path or username as arguments to SpotifyPKCE " +
|
||
|
"will be deprecated. Instead, please create a CacheFileHandler " +
|
||
|
"instance with the desired cache_path and username and pass it " +
|
||
|
"to SpotifyPKCE as the cache_handler. For example:\n\n" +
|
||
|
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
|
||
|
"\thandler = CacheFileHandler(cache_path=cache_path, " +
|
||
|
"username=username)\n" +
|
||
|
"\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
|
||
|
"redirect_uri, cache_handler=handler)",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
if cache_handler:
|
||
|
warnings.warn("A cache_handler has been specified along with a cache_path or " +
|
||
|
"username. The cache_path and username arguments will be ignored.")
|
||
|
if cache_handler:
|
||
|
assert issubclass(type(cache_handler), CacheHandler), \
|
||
|
"type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
|
||
|
self.cache_handler = cache_handler
|
||
|
else:
|
||
|
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
|
||
|
self.cache_handler = CacheFileHandler(
|
||
|
username=username,
|
||
|
cache_path=cache_path
|
||
|
)
|
||
|
self.proxies = proxies
|
||
|
self.requests_timeout = requests_timeout
|
||
|
|
||
|
self._code_challenge_method = "S256" # Spotify requires SHA256
|
||
|
self.code_verifier = None
|
||
|
self.code_challenge = None
|
||
|
self.authorization_code = None
|
||
|
self.open_browser = open_browser
|
||
|
|
||
|
def _get_code_verifier(self):
|
||
|
""" Spotify PCKE code verifier - See step 1 of the reference guide below
|
||
|
Reference:
|
||
|
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
|
||
|
"""
|
||
|
# Range (33,96) is used to select between 44-128 base64 characters for the
|
||
|
# next operation. The range looks weird because base64 is 6 bytes
|
||
|
import random
|
||
|
length = random.randint(33, 96)
|
||
|
|
||
|
# The seeded length generates between a 44 and 128 base64 characters encoded string
|
||
|
try:
|
||
|
import secrets
|
||
|
verifier = secrets.token_urlsafe(length)
|
||
|
except ImportError: # For python 3.5 support
|
||
|
import base64
|
||
|
import os
|
||
|
rand_bytes = os.urandom(length)
|
||
|
verifier = base64.urlsafe_b64encode(rand_bytes).decode('utf-8').replace('=', '')
|
||
|
return verifier
|
||
|
|
||
|
def _get_code_challenge(self):
|
||
|
""" Spotify PCKE code challenge - See step 1 of the reference guide below
|
||
|
Reference:
|
||
|
https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
|
||
|
"""
|
||
|
import base64
|
||
|
import hashlib
|
||
|
code_challenge_digest = hashlib.sha256(self.code_verifier.encode('utf-8')).digest()
|
||
|
code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8')
|
||
|
return code_challenge.replace('=', '')
|
||
|
|
||
|
def get_authorize_url(self, state=None):
|
||
|
""" Gets the URL to use to authorize this app """
|
||
|
if not self.code_challenge:
|
||
|
self.get_pkce_handshake_parameters()
|
||
|
payload = {
|
||
|
"client_id": self.client_id,
|
||
|
"response_type": "code",
|
||
|
"redirect_uri": self.redirect_uri,
|
||
|
"code_challenge_method": self._code_challenge_method,
|
||
|
"code_challenge": self.code_challenge
|
||
|
}
|
||
|
if self.scope:
|
||
|
payload["scope"] = self.scope
|
||
|
if state is None:
|
||
|
state = self.state
|
||
|
if state is not None:
|
||
|
payload["state"] = state
|
||
|
urlparams = urllibparse.urlencode(payload)
|
||
|
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
|
||
|
|
||
|
def _open_auth_url(self, state=None):
|
||
|
auth_url = self.get_authorize_url(state)
|
||
|
try:
|
||
|
webbrowser.open(auth_url)
|
||
|
logger.info("Opened %s in your browser", auth_url)
|
||
|
except webbrowser.Error:
|
||
|
logger.error("Please navigate here: %s", auth_url)
|
||
|
|
||
|
def _get_auth_response(self, open_browser=None):
|
||
|
logger.info('User authentication requires interaction with your '
|
||
|
'web browser. Once you enter your credentials and '
|
||
|
'give authorization, you will be redirected to '
|
||
|
'a url. Paste that url you were directed to to '
|
||
|
'complete the authorization.')
|
||
|
|
||
|
redirect_info = urlparse(self.redirect_uri)
|
||
|
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
|
||
|
|
||
|
if open_browser is None:
|
||
|
open_browser = self.open_browser
|
||
|
|
||
|
if (
|
||
|
open_browser
|
||
|
and redirect_host in ("127.0.0.1", "localhost")
|
||
|
and redirect_info.scheme == "http"
|
||
|
):
|
||
|
# Only start a local http server if a port is specified
|
||
|
if redirect_port:
|
||
|
return self._get_auth_response_local_server(redirect_port)
|
||
|
else:
|
||
|
logger.warning('Using `%s` as redirect URI without a port. '
|
||
|
'Specify a port (e.g. `%s:8080`) to allow '
|
||
|
'automatic retrieval of authentication code '
|
||
|
'instead of having to copy and paste '
|
||
|
'the URL your browser is redirected to.',
|
||
|
redirect_host, redirect_host)
|
||
|
return self._get_auth_response_interactive(open_browser=open_browser)
|
||
|
|
||
|
def _get_auth_response_local_server(self, redirect_port):
|
||
|
server = start_local_http_server(redirect_port)
|
||
|
self._open_auth_url()
|
||
|
server.handle_request()
|
||
|
|
||
|
if self.state is not None and server.state != self.state:
|
||
|
raise SpotifyStateError(self.state, server.state)
|
||
|
|
||
|
if server.auth_code is not None:
|
||
|
return server.auth_code
|
||
|
elif server.error is not None:
|
||
|
raise SpotifyOauthError("Received error from OAuth server: {}".format(server.error))
|
||
|
else:
|
||
|
raise SpotifyOauthError("Server listening on localhost has not been accessed")
|
||
|
|
||
|
def _get_auth_response_interactive(self, open_browser=False):
|
||
|
if open_browser or self.open_browser:
|
||
|
self._open_auth_url()
|
||
|
prompt = "Enter the URL you were redirected to: "
|
||
|
else:
|
||
|
url = self.get_authorize_url()
|
||
|
prompt = (
|
||
|
"Go to the following URL: {}\n"
|
||
|
"Enter the URL you were redirected to: ".format(url)
|
||
|
)
|
||
|
response = self._get_user_input(prompt)
|
||
|
state, code = self.parse_auth_response_url(response)
|
||
|
if self.state is not None and self.state != state:
|
||
|
raise SpotifyStateError(self.state, state)
|
||
|
return code
|
||
|
|
||
|
def get_authorization_code(self, response=None):
|
||
|
if response:
|
||
|
return self.parse_response_code(response)
|
||
|
return self._get_auth_response()
|
||
|
|
||
|
def validate_token(self, token_info):
|
||
|
if token_info is None:
|
||
|
return None
|
||
|
|
||
|
# if scopes don't match, then bail
|
||
|
if "scope" not in token_info or not self._is_scope_subset(
|
||
|
self.scope, token_info["scope"]
|
||
|
):
|
||
|
return None
|
||
|
|
||
|
if self.is_token_expired(token_info):
|
||
|
token_info = self.refresh_access_token(
|
||
|
token_info["refresh_token"]
|
||
|
)
|
||
|
|
||
|
return token_info
|
||
|
|
||
|
def _add_custom_values_to_token_info(self, token_info):
|
||
|
"""
|
||
|
Store some values that aren't directly provided by a Web API
|
||
|
response.
|
||
|
"""
|
||
|
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
|
||
|
return token_info
|
||
|
|
||
|
def get_pkce_handshake_parameters(self):
|
||
|
self.code_verifier = self._get_code_verifier()
|
||
|
self.code_challenge = self._get_code_challenge()
|
||
|
|
||
|
def get_access_token(self, code=None, check_cache=True):
|
||
|
""" Gets the access token for the app
|
||
|
|
||
|
If the code is not given and no cached token is used, an
|
||
|
authentication window will be shown to the user to get a new
|
||
|
code.
|
||
|
|
||
|
Parameters:
|
||
|
- code - the response code from authentication
|
||
|
- check_cache - if true, checks for a locally stored token
|
||
|
before requesting a new token
|
||
|
"""
|
||
|
|
||
|
if check_cache:
|
||
|
token_info = self.validate_token(self.cache_handler.get_cached_token())
|
||
|
if token_info is not None:
|
||
|
if self.is_token_expired(token_info):
|
||
|
token_info = self.refresh_access_token(
|
||
|
token_info["refresh_token"]
|
||
|
)
|
||
|
return token_info["access_token"]
|
||
|
|
||
|
if self.code_verifier is None or self.code_challenge is None:
|
||
|
self.get_pkce_handshake_parameters()
|
||
|
|
||
|
payload = {
|
||
|
"client_id": self.client_id,
|
||
|
"grant_type": "authorization_code",
|
||
|
"code": code or self.get_authorization_code(),
|
||
|
"redirect_uri": self.redirect_uri,
|
||
|
"code_verifier": self.code_verifier
|
||
|
}
|
||
|
|
||
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||
|
|
||
|
logger.debug(
|
||
|
"sending POST request to %s with Headers: %s and Body: %r",
|
||
|
self.OAUTH_TOKEN_URL, headers, payload
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
response = self._session.post(
|
||
|
self.OAUTH_TOKEN_URL,
|
||
|
data=payload,
|
||
|
headers=headers,
|
||
|
verify=True,
|
||
|
proxies=self.proxies,
|
||
|
timeout=self.requests_timeout,
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
token_info = response.json()
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return token_info["access_token"]
|
||
|
except requests.exceptions.HTTPError as http_error:
|
||
|
self._handle_oauth_error(http_error)
|
||
|
|
||
|
def refresh_access_token(self, refresh_token):
|
||
|
payload = {
|
||
|
"refresh_token": refresh_token,
|
||
|
"grant_type": "refresh_token",
|
||
|
"client_id": self.client_id,
|
||
|
}
|
||
|
|
||
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||
|
|
||
|
logger.debug(
|
||
|
"sending POST request to %s with Headers: %s and Body: %r",
|
||
|
self.OAUTH_TOKEN_URL, headers, payload
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
response = self._session.post(
|
||
|
self.OAUTH_TOKEN_URL,
|
||
|
data=payload,
|
||
|
headers=headers,
|
||
|
proxies=self.proxies,
|
||
|
timeout=self.requests_timeout,
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
token_info = response.json()
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
if "refresh_token" not in token_info:
|
||
|
token_info["refresh_token"] = refresh_token
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return token_info
|
||
|
except requests.exceptions.HTTPError as http_error:
|
||
|
self._handle_oauth_error(http_error)
|
||
|
|
||
|
def parse_response_code(self, url):
|
||
|
""" Parse the response code in the given response url
|
||
|
|
||
|
Parameters:
|
||
|
- url - the response url
|
||
|
"""
|
||
|
_, code = self.parse_auth_response_url(url)
|
||
|
if code is None:
|
||
|
return url
|
||
|
else:
|
||
|
return code
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_auth_response_url(url):
|
||
|
return SpotifyOAuth.parse_auth_response_url(url)
|
||
|
|
||
|
def get_cached_token(self):
|
||
|
warnings.warn("Calling get_cached_token directly on the SpotifyPKCE object will be " +
|
||
|
"deprecated. Instead, please specify a CacheFileHandler instance as " +
|
||
|
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
|
||
|
"get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
|
||
|
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
return self.validate_token(self.cache_handler.get_cached_token())
|
||
|
|
||
|
def _save_token_info(self, token_info):
|
||
|
warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
|
||
|
"deprecated. Instead, please specify a CacheFileHandler instance as " +
|
||
|
"the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
|
||
|
"save_token_to_cache method.",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return None
|
||
|
|
||
|
|
||
|
class SpotifyImplicitGrant(SpotifyAuthBase):
|
||
|
""" Implements Implicit Grant Flow for client apps
|
||
|
|
||
|
This auth manager enables *user and non-user* endpoints with only
|
||
|
a client secret, redirect uri, and username. The user will need to
|
||
|
copy and paste a URI from the browser every hour.
|
||
|
|
||
|
Security Warning
|
||
|
-----------------
|
||
|
The OAuth standard no longer recommends the Implicit Grant Flow for
|
||
|
client-side code. Spotify has implemented the OAuth-suggested PKCE
|
||
|
extension that removes the need for a client secret in the
|
||
|
Authentication Code flow. Use the SpotifyPKCE auth manager instead
|
||
|
of SpotifyImplicitGrant.
|
||
|
|
||
|
SpotifyPKCE contains all of the functionality of
|
||
|
SpotifyImplicitGrant, plus automatic response retrieval and
|
||
|
refreshable tokens. Only a few replacements need to be made:
|
||
|
|
||
|
* get_auth_response()['access_token'] ->
|
||
|
get_access_token(get_authorization_code())
|
||
|
* get_auth_response() ->
|
||
|
get_access_token(get_authorization_code()); get_cached_token()
|
||
|
* parse_response_token(url)['access_token'] ->
|
||
|
get_access_token(parse_response_code(url))
|
||
|
* parse_response_token(url) ->
|
||
|
get_access_token(parse_response_code(url)); get_cached_token()
|
||
|
|
||
|
The security concern in the Implicit Grant flow is that the token is
|
||
|
returned in the URL and can be intercepted through the browser. A
|
||
|
request with an authorization code and proof of origin could not be
|
||
|
easily intercepted without a compromised network.
|
||
|
"""
|
||
|
OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
|
||
|
|
||
|
def __init__(self,
|
||
|
client_id=None,
|
||
|
redirect_uri=None,
|
||
|
state=None,
|
||
|
scope=None,
|
||
|
cache_path=None,
|
||
|
username=None,
|
||
|
show_dialog=False,
|
||
|
cache_handler=None):
|
||
|
""" Creates Auth Manager using the Implicit Grant flow
|
||
|
|
||
|
**See help(SpotifyImplicitGrant) for full Security Warning**
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
* client_id: Must be supplied or set as environment variable
|
||
|
* redirect_uri: Must be supplied or set as environment variable
|
||
|
* state: May be supplied, no verification is performed
|
||
|
* scope: Optional, either a list of scopes or comma separated string of scopes.
|
||
|
e.g, "playlist-read-private,playlist-read-collaborative"
|
||
|
* cache_handler: An instance of the `CacheHandler` class to handle
|
||
|
getting and saving cached authorization tokens.
|
||
|
May be supplied, will otherwise use `CacheFileHandler`.
|
||
|
(takes precedence over `cache_path` and `username`)
|
||
|
* cache_path: (deprecated) May be supplied, will otherwise be generated
|
||
|
(takes precedence over `username`)
|
||
|
* username: (deprecated) May be supplied or set as environment variable
|
||
|
(will set `cache_path` to `.cache-{username}`)
|
||
|
* show_dialog: Interpreted as boolean
|
||
|
"""
|
||
|
logger.warning("The OAuth standard no longer recommends the Implicit "
|
||
|
"Grant Flow for client-side code. Use the SpotifyPKCE "
|
||
|
"auth manager instead of SpotifyImplicitGrant. For "
|
||
|
"more details and a guide to switching, see "
|
||
|
"help(SpotifyImplicitGrant).")
|
||
|
|
||
|
self.client_id = client_id
|
||
|
self.redirect_uri = redirect_uri
|
||
|
self.state = state
|
||
|
if username or cache_path:
|
||
|
warnings.warn("Specifying cache_path or username as arguments to " +
|
||
|
"SpotifyImplicitGrant will be deprecated. Instead, please create " +
|
||
|
"a CacheFileHandler instance with the desired cache_path and " +
|
||
|
"username and pass it to SpotifyImplicitGrant as the " +
|
||
|
"cache_handler. For example:\n\n" +
|
||
|
"\tfrom spotipy.oauth2 import CacheFileHandler\n" +
|
||
|
"\thandler = CacheFileHandler(cache_path=cache_path, " +
|
||
|
"username=username)\n" +
|
||
|
"\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
|
||
|
"redirect_uri, cache_handler=handler)",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
if cache_handler:
|
||
|
warnings.warn("A cache_handler has been specified along with a cache_path or " +
|
||
|
"username. The cache_path and username arguments will be ignored.")
|
||
|
if cache_handler:
|
||
|
assert issubclass(type(cache_handler), CacheHandler), \
|
||
|
"type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
|
||
|
self.cache_handler = cache_handler
|
||
|
else:
|
||
|
username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
|
||
|
self.cache_handler = CacheFileHandler(
|
||
|
username=username,
|
||
|
cache_path=cache_path
|
||
|
)
|
||
|
self.scope = self._normalize_scope(scope)
|
||
|
self.show_dialog = show_dialog
|
||
|
self._session = None # As to not break inherited __del__
|
||
|
|
||
|
def validate_token(self, token_info):
|
||
|
if token_info is None:
|
||
|
return None
|
||
|
|
||
|
# if scopes don't match, then bail
|
||
|
if "scope" not in token_info or not self._is_scope_subset(
|
||
|
self.scope, token_info["scope"]
|
||
|
):
|
||
|
return None
|
||
|
|
||
|
if self.is_token_expired(token_info):
|
||
|
return None
|
||
|
|
||
|
return token_info
|
||
|
|
||
|
def get_access_token(self,
|
||
|
state=None,
|
||
|
response=None,
|
||
|
check_cache=True):
|
||
|
""" Gets Auth Token from cache (preferred) or user interaction
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
* state: May be given, overrides (without changing) self.state
|
||
|
* response: URI with token, can break expiration checks
|
||
|
* check_cache: Interpreted as boolean
|
||
|
"""
|
||
|
if check_cache:
|
||
|
token_info = self.validate_token(self.cache_handler.get_cached_token())
|
||
|
if not (token_info is None or self.is_token_expired(token_info)):
|
||
|
return token_info["access_token"]
|
||
|
|
||
|
if response:
|
||
|
token_info = self.parse_response_token(response)
|
||
|
else:
|
||
|
token_info = self.get_auth_response(state)
|
||
|
token_info = self._add_custom_values_to_token_info(token_info)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
|
||
|
return token_info["access_token"]
|
||
|
|
||
|
def get_authorize_url(self, state=None):
|
||
|
""" Gets the URL to use to authorize this app """
|
||
|
payload = {
|
||
|
"client_id": self.client_id,
|
||
|
"response_type": "token",
|
||
|
"redirect_uri": self.redirect_uri,
|
||
|
}
|
||
|
if self.scope:
|
||
|
payload["scope"] = self.scope
|
||
|
if state is None:
|
||
|
state = self.state
|
||
|
if state is not None:
|
||
|
payload["state"] = state
|
||
|
if self.show_dialog:
|
||
|
payload["show_dialog"] = True
|
||
|
|
||
|
urlparams = urllibparse.urlencode(payload)
|
||
|
|
||
|
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
|
||
|
|
||
|
def parse_response_token(self, url, state=None):
|
||
|
""" Parse the response code in the given response url """
|
||
|
remote_state, token, t_type, exp_in = self.parse_auth_response_url(url)
|
||
|
if state is None:
|
||
|
state = self.state
|
||
|
if state is not None and remote_state != state:
|
||
|
raise SpotifyStateError(state, remote_state)
|
||
|
return {"access_token": token, "token_type": t_type,
|
||
|
"expires_in": exp_in, "state": state}
|
||
|
|
||
|
@staticmethod
|
||
|
def parse_auth_response_url(url):
|
||
|
url_components = urlparse(url)
|
||
|
fragment_s = url_components.fragment
|
||
|
query_s = url_components.query
|
||
|
form = dict(i.split('=') for i
|
||
|
in (fragment_s or query_s or url).split('&'))
|
||
|
if "error" in form:
|
||
|
raise SpotifyOauthError("Received error from auth server: "
|
||
|
"{}".format(form["error"]),
|
||
|
state=form["state"])
|
||
|
if "expires_in" in form:
|
||
|
form["expires_in"] = int(form["expires_in"])
|
||
|
return tuple(form.get(param) for param in ["state", "access_token",
|
||
|
"token_type", "expires_in"])
|
||
|
|
||
|
def _open_auth_url(self, state=None):
|
||
|
auth_url = self.get_authorize_url(state)
|
||
|
try:
|
||
|
webbrowser.open(auth_url)
|
||
|
logger.info("Opened %s in your browser", auth_url)
|
||
|
except webbrowser.Error:
|
||
|
logger.error("Please navigate here: %s", auth_url)
|
||
|
|
||
|
def get_auth_response(self, state=None):
|
||
|
""" Gets a new auth **token** with user interaction """
|
||
|
logger.info('User authentication requires interaction with your '
|
||
|
'web browser. Once you enter your credentials and '
|
||
|
'give authorization, you will be redirected to '
|
||
|
'a url. Paste that url you were directed to to '
|
||
|
'complete the authorization.')
|
||
|
|
||
|
redirect_info = urlparse(self.redirect_uri)
|
||
|
redirect_host, redirect_port = get_host_port(redirect_info.netloc)
|
||
|
# Implicit Grant tokens are returned in a hash fragment
|
||
|
# which is only available to the browser. Therefore, interactive
|
||
|
# URL retrieval is required.
|
||
|
if (redirect_host in ("127.0.0.1", "localhost")
|
||
|
and redirect_info.scheme == "http" and redirect_port):
|
||
|
logger.warning('Using a local redirect URI with a '
|
||
|
'port, likely expecting automatic '
|
||
|
'retrieval. Due to technical limitations, '
|
||
|
'the authentication token cannot be '
|
||
|
'automatically retrieved and must be '
|
||
|
'copied and pasted.')
|
||
|
|
||
|
self._open_auth_url(state)
|
||
|
logger.info('Paste that url you were directed to in order to '
|
||
|
'complete the authorization')
|
||
|
response = SpotifyImplicitGrant._get_user_input("Enter the URL you "
|
||
|
"were redirected to: ")
|
||
|
return self.parse_response_token(response, state)
|
||
|
|
||
|
def _add_custom_values_to_token_info(self, token_info):
|
||
|
"""
|
||
|
Store some values that aren't directly provided by a Web API
|
||
|
response.
|
||
|
"""
|
||
|
token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
|
||
|
token_info["scope"] = self.scope
|
||
|
return token_info
|
||
|
|
||
|
def get_cached_token(self):
|
||
|
warnings.warn("Calling get_cached_token directly on the SpotifyImplicitGrant " +
|
||
|
"object will be deprecated. Instead, please specify a " +
|
||
|
"CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
|
||
|
"and use the CacheFileHandler's get_cached_token method. " +
|
||
|
"You can replace:\n\tsp.get_cached_token()" +
|
||
|
"\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
return self.validate_token(self.cache_handler.get_cached_token())
|
||
|
|
||
|
def _save_token_info(self, token_info):
|
||
|
warnings.warn("Calling _save_token_info directly on the SpotifyImplicitGrant " +
|
||
|
"object will be deprecated. Instead, please specify a " +
|
||
|
"CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
|
||
|
"and use the CacheFileHandler's save_token_to_cache method.",
|
||
|
DeprecationWarning
|
||
|
)
|
||
|
self.cache_handler.save_token_to_cache(token_info)
|
||
|
return None
|
||
|
|
||
|
|
||
|
class RequestHandler(BaseHTTPRequestHandler):
|
||
|
def do_GET(self):
|
||
|
self.server.auth_code = self.server.error = None
|
||
|
try:
|
||
|
state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path)
|
||
|
self.server.state = state
|
||
|
self.server.auth_code = auth_code
|
||
|
except SpotifyOauthError as error:
|
||
|
self.server.error = error
|
||
|
|
||
|
self.send_response(200)
|
||
|
self.send_header("Content-Type", "text/html")
|
||
|
self.end_headers()
|
||
|
|
||
|
if self.server.auth_code:
|
||
|
status = "successful"
|
||
|
elif self.server.error:
|
||
|
status = "failed ({})".format(self.server.error)
|
||
|
else:
|
||
|
self._write("<html><body><h1>Invalid request</h1></body></html>")
|
||
|
return
|
||
|
|
||
|
self._write("""<html>
|
||
|
<script>
|
||
|
window.close()
|
||
|
</script>
|
||
|
<body>
|
||
|
<h1>Authentication status: {}</h1>
|
||
|
This window can be closed.
|
||
|
<script>
|
||
|
window.close()
|
||
|
</script>
|
||
|
<button class="closeButton" style="cursor: pointer" onclick="window.close();">Close Window</button>
|
||
|
</body>
|
||
|
</html>""".format(status))
|
||
|
|
||
|
def _write(self, text):
|
||
|
return self.wfile.write(text.encode("utf-8"))
|
||
|
|
||
|
def log_message(self, format, *args):
|
||
|
return
|
||
|
|
||
|
|
||
|
def start_local_http_server(port, handler=RequestHandler):
|
||
|
server = HTTPServer(("127.0.0.1", port), handler)
|
||
|
server.allow_reuse_address = True
|
||
|
server.auth_code = None
|
||
|
server.auth_token_form = None
|
||
|
server.error = None
|
||
|
return server
|