D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
lib
/
python2.7
/
site-packages
/
urllib3
/
contrib
/
Filename :
pyopenssl.py
back
Copy
'''SSL with SNI_-support for Python 2. Follow these instructions if you would like to verify SSL certificates in Python 2. Note, the default libraries do *not* do certificate checking; you need to do additional work to validate certificates yourself. This needs the following packages installed: * pyOpenSSL (tested with 0.13) * ndg-httpsclient (tested with 0.3.2) * pyasn1 (tested with 0.1.6) You can install them with the following command: pip install pyopenssl ndg-httpsclient pyasn1 To activate certificate checking, call :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code before you begin making HTTP requests. This can be done in a ``sitecustomize`` module, or at any other time before your application begins using ``urllib3``, like this:: try: import urllib3.contrib.pyopenssl urllib3.contrib.pyopenssl.inject_into_urllib3() except ImportError: pass Now you can use :mod:`urllib3` as you normally would, and it will support SNI when the required modules are installed. Activating this module also has the positive side effect of disabling SSL/TLS compression in Python 2 (see `CRIME attack`_). If you want to configure the default list of supported cipher suites, you can set the ``urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST`` variable. Module Variables ---------------- :var DEFAULT_SSL_CIPHER_LIST: The list of supported SSL/TLS cipher suites. Default: ``ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES: ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS`` .. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication .. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit) ''' try: from ndg.httpsclient.ssl_peer_verification import SUBJ_ALT_NAME_SUPPORT from ndg.httpsclient.subj_alt_name import SubjectAltName as BaseSubjectAltName except SyntaxError as e: raise ImportError(e) import OpenSSL.SSL from pyasn1.codec.der import decoder as der_decoder from pyasn1.type import univ, constraint from socket import _fileobject, timeout import ssl import select from .. import connection from .. import util __all__ = ['inject_into_urllib3', 'extract_from_urllib3'] # SNI only *really* works if we can read the subjectAltName of certificates. HAS_SNI = SUBJ_ALT_NAME_SUPPORT # Map from urllib3 to PyOpenSSL compatible parameter-values. _openssl_versions = { ssl.PROTOCOL_SSLv23: OpenSSL.SSL.SSLv23_METHOD, ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, } try: _openssl_versions.update({ssl.PROTOCOL_SSLv3: OpenSSL.SSL.SSLv3_METHOD}) except AttributeError: pass _openssl_verify = { ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, } # A secure default. # Sources for more information on TLS ciphers: # # - https://wiki.mozilla.org/Security/Server_Side_TLS # - https://www.ssllabs.com/projects/best-practices/index.html # - https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/ # # The general intent is: # - Prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE), # - prefer ECDHE over DHE for better performance, # - prefer any AES-GCM over any AES-CBC for better performance and security, # - use 3DES as fallback which is secure but slow, # - disable NULL authentication, MD5 MACs and DSS for security reasons. DEFAULT_SSL_CIPHER_LIST = "ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:" + \ "ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:" + \ "!aNULL:!MD5:!DSS" orig_util_HAS_SNI = util.HAS_SNI orig_connection_ssl_wrap_socket = connection.ssl_wrap_socket def inject_into_urllib3(): 'Monkey-patch urllib3 with PyOpenSSL-backed SSL-support.' connection.ssl_wrap_socket = ssl_wrap_socket util.HAS_SNI = HAS_SNI def extract_from_urllib3(): 'Undo monkey-patching by :func:`inject_into_urllib3`.' connection.ssl_wrap_socket = orig_connection_ssl_wrap_socket util.HAS_SNI = orig_util_HAS_SNI ### Note: This is a slightly bug-fixed version of same from ndg-httpsclient. class SubjectAltName(BaseSubjectAltName): '''ASN.1 implementation for subjectAltNames support''' # There is no limit to how many SAN certificates a certificate may have, # however this needs to have some limit so we'll set an arbitrarily high # limit. sizeSpec = univ.SequenceOf.sizeSpec + \ constraint.ValueSizeConstraint(1, 1024) ### Note: This is a slightly bug-fixed version of same from ndg-httpsclient. def get_subj_alt_name(peer_cert): # Search through extensions dns_name = [] if not SUBJ_ALT_NAME_SUPPORT: return dns_name general_names = SubjectAltName() for i in range(peer_cert.get_extension_count()): ext = peer_cert.get_extension(i) ext_name = ext.get_short_name() if ext_name != 'subjectAltName': continue # PyOpenSSL returns extension data in ASN.1 encoded form ext_dat = ext.get_data() decoded_dat = der_decoder.decode(ext_dat, asn1Spec=general_names) for name in decoded_dat: if not isinstance(name, SubjectAltName): continue for entry in range(len(name)): component = name.getComponentByPosition(entry) if component.getName() != 'dNSName': continue dns_name.append(str(component.getComponent())) return dns_name class WrappedSocket(object): '''API-compatibility wrapper for Python OpenSSL's Connection-class. Note: _makefile_refs, _drop() and _reuse() are needed for the garbage collector of pypy. ''' def __init__(self, connection, socket, suppress_ragged_eofs=True): self.connection = connection self.socket = socket self.suppress_ragged_eofs = suppress_ragged_eofs self._makefile_refs = 0 def fileno(self): return self.socket.fileno() def makefile(self, mode, bufsize=-1): self._makefile_refs += 1 return _fileobject(self, mode, bufsize, close=True) def recv(self, *args, **kwargs): try: data = self.connection.recv(*args, **kwargs) except OpenSSL.SSL.SysCallError as e: if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'): return b'' else: raise except OpenSSL.SSL.ZeroReturnError as e: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: return b'' else: raise except OpenSSL.SSL.WantReadError: rd, wd, ed = select.select( [self.socket], [], [], self.socket.gettimeout()) if not rd: raise timeout('The read operation timed out') else: return self.recv(*args, **kwargs) else: return data def settimeout(self, timeout): return self.socket.settimeout(timeout) def _send_until_done(self, data): while True: try: return self.connection.send(data) except OpenSSL.SSL.WantWriteError: _, wlist, _ = select.select([], [self.socket], [], self.socket.gettimeout()) if not wlist: raise timeout() continue def sendall(self, data): while len(data): sent = self._send_until_done(data) data = data[sent:] def close(self): if self._makefile_refs < 1: return self.connection.shutdown() else: self._makefile_refs -= 1 def getpeercert(self, binary_form=False): x509 = self.connection.get_peer_certificate() if not x509: return x509 if binary_form: return OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_ASN1, x509) return { 'subject': ( (('commonName', x509.get_subject().CN),), ), 'subjectAltName': [ ('DNS', value) for value in get_subj_alt_name(x509) ] } def _reuse(self): self._makefile_refs += 1 def _drop(self): if self._makefile_refs < 1: self.close() else: self._makefile_refs -= 1 def _verify_callback(cnx, x509, err_no, err_depth, return_code): return err_no == 0 def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None, ca_certs=None, server_hostname=None, ssl_version=None): ctx = OpenSSL.SSL.Context(_openssl_versions[ssl_version]) if certfile: keyfile = keyfile or certfile # Match behaviour of the normal python ssl library ctx.use_certificate_file(certfile) if keyfile: ctx.use_privatekey_file(keyfile) if cert_reqs != ssl.CERT_NONE: ctx.set_verify(_openssl_verify[cert_reqs], _verify_callback) if ca_certs: try: ctx.load_verify_locations(ca_certs, None) except OpenSSL.SSL.Error as e: raise ssl.SSLError('bad ca_certs: %r' % ca_certs, e) else: ctx.set_default_verify_paths() # Disable TLS compression to migitate CRIME attack (issue #309) OP_NO_COMPRESSION = 0x20000 ctx.set_options(OP_NO_COMPRESSION) # Set list of supported ciphersuites. ctx.set_cipher_list(DEFAULT_SSL_CIPHER_LIST) cnx = OpenSSL.SSL.Connection(ctx, sock) cnx.set_tlsext_host_name(server_hostname) cnx.set_connect_state() while True: try: cnx.do_handshake() except OpenSSL.SSL.WantReadError: select.select([sock], [], []) continue except OpenSSL.SSL.Error as e: raise ssl.SSLError('bad handshake', e) break return WrappedSocket(cnx, sock)