# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007 Python Software
# Foundation; All Rights Reserved
"""A HTTPSConnection/Handler with additional proxy and cert validation features.
In particular, monkey patches in Python r74203 to provide support for CONNECT
proxies and adds SSL cert validation if the ssl module is present.
"""
__author__ = "{frew,nick.johnson}@google.com (Fred Wulff and Nick Johnson)"
import base64
import httplib
import logging
import socket
from urllib import splitpasswd
from urllib import splittype
from urllib import splituser
import urllib2
class InvalidCertificateException(httplib.HTTPException):
"""Raised when a certificate is provided with an invalid hostname."""
def __init__(self, host, cert, reason):
"""Constructor.
Args:
host: The hostname the connection was made to.
cert: The SSL certificate (as a dictionary) the host returned.
reason: user readable error reason.
"""
httplib.HTTPException.__init__(self)
self.host = host
self.cert = cert
self.reason = reason
def __str__(self):
return ("Host %s returned an invalid certificate (%s): %s\n"
"To learn more, see "
"http://code.google.com/appengine/kb/general.html#rpcssl" %
(self.host, self.reason, self.cert))
try:
import ssl
_CAN_VALIDATE_CERTS = True
except ImportError:
_CAN_VALIDATE_CERTS = False
def can_validate_certs():
"""Return True if we have the SSL package and can validate certificates."""
return _CAN_VALIDATE_CERTS
# Reexport SSLError so clients don't have to to do their own checking for ssl's
# existence.
if can_validate_certs():
SSLError = ssl.SSLError
else:
SSLError = None
def create_fancy_connection(tunnel_host=None, key_file=None,
cert_file=None, ca_certs=None,
proxy_authorization=None):
# This abomination brought to you by the fact that
# the HTTPHandler creates the connection instance in the middle
# of do_open so we need to add the tunnel host to the class.
class PresetProxyHTTPSConnection(httplib.HTTPSConnection):
"""An HTTPS connection that uses a proxy defined by the enclosing scope."""
def __init__(self, *args, **kwargs):
httplib.HTTPSConnection.__init__(self, *args, **kwargs)
self._tunnel_host = tunnel_host
if tunnel_host:
logging.debug("Creating preset proxy https conn: %s", tunnel_host)
self.key_file = key_file
self.cert_file = cert_file
self.ca_certs = ca_certs
if can_validate_certs():
if self.ca_certs:
self.cert_reqs = ssl.CERT_REQUIRED
else:
self.cert_reqs = ssl.CERT_NONE
def _get_hostport(self, host, port):
# Python 2.7.7rc1 (hg r90728:568041fd8090), 3.4.1 and 3.5 rename
# _set_hostport to _get_hostport and changes it's functionality. The
# Python 2.7.7rc1 version of this method is included here for
# compatibility with earlier versions of Python. Without this, HTTPS over
# HTTP CONNECT proxies cannot be used.
# This method may be removed if compatibility with Python <2.7.7rc1 is not
# required.
# Python bug: http://bugs.python.org/issue7776
if port is None:
i = host.rfind(":")
j = host.rfind("]") # ipv6 addresses have [...]
if i > j:
try:
port = int(host[i+1:])
except ValueError:
if host[i+1:] == "": # http://foo.com:/ == http://foo.com/
port = self.default_port
else:
raise httplib.InvalidURL("nonnumeric port: '%s'" % host[i+1:])
host = host[:i]
else:
port = self.default_port
if host and host[0] == "[" and host[-1] == "]":
host = host[1:-1]
return (host, port)
def _tunnel(self):
self.host, self.port = self._get_hostport(self._tunnel_host, None)
logging.info("Connecting through tunnel to: %s:%d",
self.host, self.port)
self.send("CONNECT %s:%d HTTP/1.0\r\n" % (self.host, self.port))
if proxy_authorization:
self.send("Proxy-Authorization: %s\r\n" % proxy_authorization)
# blank line
self.send("\r\n")
response = self.response_class(self.sock, strict=self.strict,
method=self._method)
# pylint: disable=protected-access
(_, code, message) = response._read_status()
if code != 200:
self.close()
raise socket.error("Tunnel connection failed: %d %s" %
(code, message.strip()))
while True:
line = response.fp.readline()
if line == "\r\n":
break
def _get_valid_hosts_for_cert(self, cert):
"""Returns a list of valid host globs for an SSL certificate.
Args:
cert: A dictionary representing an SSL certificate.
Returns:
list: A list of valid host globs.
"""
if "subjectAltName" in cert:
return [x[1] for x in cert["subjectAltName"] if x[0].lower() == "dns"]
else:
# Return a list of commonName fields
return [x[0][1] for x in cert["subject"]
if x[0][0].lower() == "commonname"]
def _validate_certificate_hostname(self, cert, hostname):
"""Perform RFC2818/6125 validation against a cert and hostname.
Args:
cert: A dictionary representing an SSL certificate.
hostname: The hostname to test.
Returns:
bool: Whether or not the hostname is valid for this certificate.
"""
hosts = self._get_valid_hosts_for_cert(cert)
for host in hosts:
# Wildcards are only valid when the * exists at the end of the last
# (left-most) label, and there are at least 3 labels in the expression.
if ("*." in host and host.count("*") == 1 and
host.count(".") > 1 and "." in hostname):
left_expected, right_expected = host.split("*.")
left_hostname, right_hostname = hostname.split(".", 1)
if (left_hostname.startswith(left_expected) and
right_expected == right_hostname):
return True
elif host == hostname:
return True
return False
def connect(self):
# TODO(frew): When we drop support for <2.6 (in the far distant future),
# change this to socket.create_connection.
self.sock = _create_connection((self.host, self.port))
if self._tunnel_host:
self._tunnel()
# ssl and FakeSocket got deprecated. Try for the new hotness of wrap_ssl,
# with fallback. Note: Since can_validate_certs() just checks for the
# ssl module, it's equivalent to attempting to import ssl from
# the function, but doesn't require a dynamic import, which doesn't
# play nicely with dev_appserver.
if can_validate_certs():
self.sock = ssl.wrap_socket(self.sock,
keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_certs,
cert_reqs=self.cert_reqs)
if self.cert_reqs & ssl.CERT_REQUIRED:
cert = self.sock.getpeercert()
hostname = self.host.split(":", 0)[0]
if not self._validate_certificate_hostname(cert, hostname):
raise InvalidCertificateException(hostname, cert,
"hostname mismatch")
else:
ssl_socket = socket.ssl(self.sock,
keyfile=self.key_file,
certfile=self.cert_file)
self.sock = httplib.FakeSocket(self.sock, ssl_socket)
return PresetProxyHTTPSConnection
# Here to end of _create_connection copied wholesale from Python 2.6"s socket.py
_GLOBAL_DEFAULT_TIMEOUT = object()
def _create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Connect to *address* and return the socket object.
Convenience function. Connect to *address* (a 2-tuple ``(host,
port)``) and return the socket object. Passing the optional
*timeout* parameter will set the timeout on the socket instance
before attempting to connect. If no *timeout* is supplied, the
global default timeout setting returned by :func:`getdefaulttimeout`
is used.
"""
msg = "getaddrinfo returns an empty list"
host, port = address
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
sock.connect(sa)
return sock
except socket.error, msg:
if sock is not None:
sock.close()
raise socket.error, msg
class FancyRequest(urllib2.Request):
"""A request that allows the use of a CONNECT proxy."""
def __init__(self, *args, **kwargs):
urllib2.Request.__init__(self, *args, **kwargs)
self._tunnel_host = None
self._key_file = None
self._cert_file = None
self._ca_certs = None
def set_proxy(self, host, type):
saved_type = None
if self.get_type() == "https" and not self._tunnel_host:
self._tunnel_host = self.get_host()
saved_type = self.get_type()
urllib2.Request.set_proxy(self, host, type)
if saved_type:
# Don't set self.type, we want to preserve the
# type for tunneling.
self.type = saved_type
def set_ssl_info(self, key_file=None, cert_file=None, ca_certs=None):
self._key_file = key_file
self._cert_file = cert_file
self._ca_certs = ca_certs
class FancyProxyHandler(urllib2.ProxyHandler):
"""A ProxyHandler that works with CONNECT-enabled proxies."""
# Taken verbatim from /usr/lib/python2.5/urllib2.py
def _parse_proxy(self, proxy):
"""Return (scheme, user, password, host/port) given a URL or an authority.
If a URL is supplied, it must have an authority (host:port) component.
According to RFC 3986, having an authority component means the URL must
have two slashes after the scheme:
>>> _parse_proxy('file:/ftp.example.com/')
Traceback (most recent call last):
ValueError: proxy URL with no authority: 'file:/ftp.example.com/'
The first three items of the returned tuple may be None.
Examples of authority parsing:
>>> _parse_proxy('proxy.example.com')
(None, None, None, 'proxy.example.com')
>>> _parse_proxy('proxy.example.com:3128')
(None, None, None, 'proxy.example.com:3128')
The authority component may optionally include userinfo (assumed to be
username:password):
>>> _parse_proxy('joe:[email protected]')
(None, 'joe', 'password', 'proxy.example.com')
>>> _parse_proxy('joe:[email protected]:3128')
(None, 'joe', 'password', 'proxy.example.com:3128')
Same examples, but with URLs instead:
>>> _parse_proxy('http://proxy.example.com/')
('http', None, None, 'proxy.example.com')
>>> _parse_proxy('http://proxy.example.com:3128/')
('http', None, None, 'proxy.example.com:3128')
>>> _parse_proxy('http://joe:[email protected]/')
('http', 'joe', 'password', 'proxy.example.com')
>>> _parse_proxy('http://joe:[email protected]:3128')
('http', 'joe', 'password', 'proxy.example.com:3128')
Everything after the authority is ignored:
>>> _parse_proxy('ftp://joe:[email protected]/rubbish:3128')
('ftp', 'joe', 'password', 'proxy.example.com')
Test for no trailing '/' case:
>>> _parse_proxy('http://joe:[email protected]')
('http', 'joe', 'password', 'proxy.example.com')
"""
scheme, r_scheme = splittype(proxy)
if not r_scheme.startswith("/"):
# authority
scheme = None
authority = proxy
else:
# URL
if not r_scheme.startswith("//"):
raise ValueError("proxy URL with no authority: %r" % proxy)
# We have an authority, so for RFC 3986-compliant URLs (by ss 3.
# and 3.3.), path is empty or starts with '/'
end = r_scheme.find("/", 2)
if end == -1:
end = None
authority = r_scheme[2:end]
userinfo, hostport = splituser(authority)
if userinfo is not None:
user, password = splitpasswd(userinfo)
else:
user = password = None
return scheme, user, password, hostport
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
class FancyHTTPSHandler(urllib2.HTTPSHandler):
"""An HTTPSHandler that works with CONNECT-enabled proxies."""
def do_open(self, http_class, req, **kwargs):
proxy_authorization = None
for header in req.headers:
if header.lower() == "proxy-authorization":
proxy_authorization = req.headers[header]
break
# Intentionally very specific so as to opt for false negatives
# rather than false positives.
try:
return urllib2.HTTPSHandler.do_open(
self,
create_fancy_connection(req._tunnel_host,
req._key_file,
req._cert_file,
req._ca_certs,
proxy_authorization),
req,
**kwargs)
except urllib2.URLError, url_error:
try:
import ssl
if (type(url_error.reason) == ssl.SSLError and
url_error.reason.args[0] == 1):
# Display the reason to the user. Need to use args for python2.5
# compat.
raise InvalidCertificateException(req.host, "",
url_error.reason.args[1])
except ImportError:
pass
raise url_error
# We have to implement this so that we persist the tunneling behavior
# through redirects.
class FancyRedirectHandler(urllib2.HTTPRedirectHandler):
"""A redirect handler that persists CONNECT-enabled proxy information."""
def redirect_request(self, req, *args, **kwargs):
new_req = urllib2.HTTPRedirectHandler.redirect_request(
self, req, *args, **kwargs)
# Same thing as in our set_proxy implementation, but in this case
# we"ve only got a Request to work with, so it was this or copy
# everything over piecemeal.
#
# Note that we do not persist tunneling behavior from an http request
# to an https request, because an http request does not set _tunnel_host.
#
# Also note that in Python < 2.6, you will get an error in
# FancyHTTPSHandler.do_open() on an https urllib2.Request that uses an http
# proxy, since the proxy type will be set to http instead of https.
# (FancyRequest, and urllib2.Request in Python >= 2.6 set the proxy type to
# https.) Such an urllib2.Request could result from this redirect
# if you are redirecting from an http request (since an an http request
# does not have _tunnel_host set, and thus you will not set the proxy
# in the code below), and if you have defined a proxy for https in, say,
# FancyProxyHandler, and that proxy has type http.
if hasattr(req, "_tunnel_host") and isinstance(new_req, urllib2.Request):
if new_req.get_type() == "https":
if req._tunnel_host:
# req is proxied, so copy the proxy info.
new_req._tunnel_host = new_req.get_host()
new_req.set_proxy(req.host, "https")
else:
# req is not proxied, so just make sure _tunnel_host is defined.
new_req._tunnel_host = None
new_req.type = "https"
if hasattr(req, "_key_file") and isinstance(new_req, urllib2.Request):
# Copy the auxiliary data in case this or any further redirect is https
new_req._key_file = req._key_file
new_req._cert_file = req._cert_file
new_req._ca_certs = req._ca_certs
return new_req