rana-cli/wakatime/packages/requests_ntlm/requests_ntlm.py

219 lines
7.9 KiB
Python
Raw Normal View History

2017-02-15 23:02:05 +00:00
import hashlib
import sys
import warnings
from ntlm_auth import ntlm
from requests.auth import AuthBase
from requests.packages.urllib3.response import HTTPResponse
class HttpNtlmAuth(AuthBase):
"""
HTTP NTLM Authentication Handler for Requests.
Supports pass-the-hash.
"""
def __init__(self, username, password, session=None):
"""Create an authentication handler for NTLM over HTTP.
:param str username: Username in 'domain\\username' format
:param str password: Password
:param str session: Unused. Kept for backwards-compatibility.
"""
if ntlm is None:
raise Exception("NTLM libraries unavailable")
# parse the username
try:
self.domain, self.username = username.split('\\', 1)
except ValueError:
self.username = username
self.domain = ''
if self.domain:
self.domain = self.domain.upper()
self.password = password
# This exposes the encrypt/decrypt methods used to encrypt and decrypt messages
# sent after ntlm authentication. These methods are utilised by libraries that
# call requests_ntlm to encrypt and decrypt the messages sent after authentication
self.session_security = None
def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
response, auth_type, args):
# Get the certificate of the server if using HTTPS for CBT
server_certificate_hash = _get_server_cert(response)
"""Attempt to authenticate using HTTP NTLM challenge/response."""
if auth_header in response.request.headers:
return response
content_length = int(
response.request.headers.get('Content-Length', '0'), base=10)
if hasattr(response.request.body, 'seek'):
if content_length > 0:
response.request.body.seek(-content_length, 1)
else:
response.request.body.seek(0, 0)
# Consume content and release the original connection
# to allow our new request to reuse the same one.
response.content
response.raw.release_conn()
request = response.request.copy()
# ntlm returns the headers as a base64 encoded bytestring. Convert to
# a string.
context = ntlm.Ntlm()
negotiate_message = context.create_negotiate_message(self.domain).decode('ascii')
auth = u'%s %s' % (auth_type, negotiate_message)
request.headers[auth_header] = auth
# A streaming response breaks authentication.
# This can be fixed by not streaming this request, which is safe
# because the returned response3 will still have stream=True set if
# specified in args. In addition, we expect this request to give us a
# challenge and not the real content, so the content will be short
# anyway.
args_nostream = dict(args, stream=False)
response2 = response.connection.send(request, **args_nostream)
# needed to make NTLM auth compatible with requests-2.3.0
# Consume content and release the original connection
# to allow our new request to reuse the same one.
response2.content
response2.raw.release_conn()
request = response2.request.copy()
# this is important for some web applications that store
# authentication-related info in cookies (it took a long time to
# figure out)
if response2.headers.get('set-cookie'):
request.headers['Cookie'] = response2.headers.get('set-cookie')
# get the challenge
auth_header_value = response2.headers[auth_header_field]
auth_strip = auth_type + ' '
ntlm_header_value = next(
s for s in (val.lstrip() for val in auth_header_value.split(','))
if s.startswith(auth_strip)
).strip()
# Parse the challenge in the ntlm context
context.parse_challenge_message(ntlm_header_value[len(auth_strip):])
# build response
# Get the response based on the challenge message
authenticate_message = context.create_authenticate_message(
self.username,
self.password,
self.domain,
server_certificate_hash=server_certificate_hash
)
authenticate_message = authenticate_message.decode('ascii')
auth = u'%s %s' % (auth_type, authenticate_message)
request.headers[auth_header] = auth
response3 = response2.connection.send(request, **args)
# Update the history.
response3.history.append(response)
response3.history.append(response2)
# Get the session_security object created by ntlm-auth for signing and sealing of messages
self.session_security = context.session_security
return response3
def response_hook(self, r, **kwargs):
"""The actual hook handler."""
if r.status_code == 401:
# Handle server auth.
www_authenticate = r.headers.get('www-authenticate', '').lower()
auth_type = _auth_type_from_header(www_authenticate)
if auth_type is not None:
return self.retry_using_http_NTLM_auth(
'www-authenticate',
'Authorization',
r,
auth_type,
kwargs
)
elif r.status_code == 407:
# If we didn't have server auth, do proxy auth.
proxy_authenticate = r.headers.get(
'proxy-authenticate', ''
).lower()
auth_type = _auth_type_from_header(proxy_authenticate)
if auth_type is not None:
return self.retry_using_http_NTLM_auth(
'proxy-authenticate',
'Proxy-authorization',
r,
auth_type,
kwargs
)
return r
def __call__(self, r):
# we must keep the connection because NTLM authenticates the
# connection, not single requests
r.headers["Connection"] = "Keep-Alive"
r.register_hook('response', self.response_hook)
return r
def _auth_type_from_header(header):
"""
Given a WWW-Authenticate or Proxy-Authenticate header, returns the
authentication type to use. We prefer NTLM over Negotiate if the server
suppports it.
"""
if 'ntlm' in header:
return 'NTLM'
elif 'negotiate' in header:
return 'Negotiate'
return None
def _get_server_cert(response):
"""
Get the certificate at the request_url and return it as a SHA256 hash. Will get the raw socket from the
original response from the server. This socket is then checked if it is an SSL socket and then used to
get the hash of the certificate. The certificate hash is then used with NTLMv2 authentication for
Channel Binding Tokens support. If the raw object is not a urllib3 HTTPReponse (default with requests)
then no certificate will be returned.
:param response: The original 401 response from the server
:return: SHA256 hash of the DER encoded certificate at the request_url or None if not a HTTPS endpoint
"""
certificate_hash = None
raw_response = response.raw
if isinstance(raw_response, HTTPResponse):
if sys.version_info > (3, 0):
socket = raw_response._fp.fp.raw._sock
else:
socket = raw_response._fp.fp._sock
try:
server_certificate = socket.getpeercert(True)
except AttributeError:
pass
else:
hash_object = hashlib.sha256(server_certificate)
certificate_hash = hash_object.hexdigest().upper()
else:
warnings.warn("Requests is running with a non urllib3 backend, cannot retrieve server certificate for CBT", NoCertificateRetrievedWarning)
return certificate_hash
class NoCertificateRetrievedWarning(Warning):
pass