if strict is not None:
self.strict = strict
- def _set_tunnel(self, host, port=None):
+ def _set_tunnel(self, host, port=None, headers=None):
self._tunnel_host = host
self._tunnel_port = port
+ if headers:
+ self._tunnel_headers = headers
+ else:
+ self._tunnel_headers.clear()
def _set_hostport(self, host, port):
if port is None:
def _tunnel(self):
self._set_hostport(self._tunnel_host, self._tunnel_port)
- connect_str = "CONNECT %s:%d HTTP/1.0\r\n\r\n" %(self.host, self.port)
+ connect_str = "CONNECT %s:%d HTTP/1.0\r\n" %(self.host, self.port)
connect_bytes = connect_str.encode("ascii")
self.send(connect_bytes)
+ for header, value in self._tunnel_headers.iteritems():
+ header_str = "%s: %s\r\n" % (header, value)
+ header_bytes = header_str.encode("ascii")
+ self.send(header_bytes)
+
response = self.response_class(self.sock, strict = self.strict,
- method= self._method)
+ method = self._method)
(version, code, message) = response._read_status()
+
if code != 200:
self.close()
raise socket.error("Tunnel connection failed: %d %s" % (code,
def __call__(self, *args):
return self.handle(self.meth_name, self.action, *args)
+class MockHTTPResponse(io.IOBase):
+ def __init__(self, fp, msg, status, reason):
+ self.fp = fp
+ self.msg = msg
+ self.status = status
+ self.reason = reason
+ self.code = 200
+
+ def read(self):
+ return ''
+
+ def info(self):
+ return {}
+
+ def geturl(self):
+ return self.url
+
+
+class MockHTTPClass:
+ def __init__(self):
+ self.level = 0
+ self.req_headers = []
+ self.data = None
+ self.raise_on_endheaders = False
+ self._tunnel_headers = {}
+
+ def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
+ self.host = host
+ self.timeout = timeout
+ return self
+
+ def set_debuglevel(self, level):
+ self.level = level
+
+ def _set_tunnel(self, host, port=None, headers=None):
+ self._tunnel_host = host
+ self._tunnel_port = port
+ if headers:
+ self._tunnel_headers = headers
+ else:
+ self._tunnel_headers.clear()
+
+ def request(self, method, url, body=None, headers={}):
+ self.method = method
+ self.selector = url
+ self.req_headers += headers.items()
+ self.req_headers.sort()
+ if body:
+ self.data = body
+ if self.raise_on_endheaders:
+ import socket
+ raise socket.error()
+ def getresponse(self):
+ return MockHTTPResponse(MockFile(), {}, 200, "OK")
+
class MockHandler:
# useful for testing handler machinery
# see add_ordered_mock_handlers() docstring
msg = email.message_from_string("\r\n\r\n")
return MockResponse(200, "OK", msg, "", req.get_full_url())
+class MockHTTPSHandler(urllib.request.AbstractHTTPHandler):
+ # Useful for testing the Proxy-Authorization request by verifying the
+ # properties of httpcon
+ httpconn = MockHTTPClass()
+ def https_open(self, req):
+ return self.do_open(self.httpconn, req)
+
class MockPasswordManager:
def add_password(self, realm, uri, user, password):
self.realm = realm
self.assertEqual(req.type, "ftp")
def test_http(self):
- class MockHTTPResponse(io.IOBase):
- def __init__(self, fp, msg, status, reason):
- self.fp = fp
- self.msg = msg
- self.status = status
- self.reason = reason
- self.code = 200
- def read(self):
- return ''
- def info(self):
- return {}
- def geturl(self):
- return self.url
- class MockHTTPClass:
- def __init__(self):
- self.level = 0
- self.req_headers = []
- self.data = None
- self.raise_on_endheaders = False
- def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
- self.host = host
- self.timeout = timeout
- return self
- def set_debuglevel(self, level):
- self.level = level
- def request(self, method, url, body=None, headers={}):
- self.method = method
- self.selector = url
- self.req_headers += headers.items()
- self.req_headers.sort()
- if body:
- self.data = body
- if self.raise_on_endheaders:
- import socket
- raise socket.error()
- def getresponse(self):
- return MockHTTPResponse(MockFile(), {}, 200, "OK")
h = urllib.request.AbstractHTTPHandler()
o = h.parent = MockOpener()
self.assertEqual([(handlers[0], "https_open")],
[tup[0:2] for tup in o.calls])
+ def test_proxy_https_proxy_authorization(self):
+ o = OpenerDirector()
+ ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128'))
+ o.add_handler(ph)
+ https_handler = MockHTTPSHandler()
+ o.add_handler(https_handler)
+ req = Request("https://www.example.com/")
+ req.add_header("Proxy-Authorization","FooBar")
+ req.add_header("User-Agent","Grail")
+ self.assertEqual(req.get_host(), "www.example.com")
+ self.assertIsNone(req._tunnel_host)
+ r = o.open(req)
+ # Verify Proxy-Authorization gets tunneled to request.
+ # httpsconn req_headers do not have the Proxy-Authorization header but
+ # the req will have.
+ self.assertFalse(("Proxy-Authorization","FooBar") in
+ https_handler.httpconn.req_headers)
+ self.assertTrue(("User-Agent","Grail") in
+ https_handler.httpconn.req_headers)
+ self.assertIsNotNone(req._tunnel_host)
+ self.assertEqual(req.get_host(), "proxy.example.com:3128")
+ self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
objects of interest:
-OpenerDirector -- Sets up the User-Agent as the Python-urllib and manages the
-Handler classes while dealing with both requests and responses.
+OpenerDirector -- Sets up the User Agent as the Python-urllib client and manages
+the Handler classes, while dealing with requests and responses.
Request -- An object that encapsulates the state of a request. The
state can be as simple as the URL. It can also include extra HTTP
headers = dict((name.title(), val) for name, val in headers.items())
if req._tunnel_host:
- h._set_tunnel(req._tunnel_host)
+ tunnel_headers = {}
+ proxy_auth_hdr = "Proxy-Authorization"
+ if proxy_auth_hdr in headers:
+ tunnel_headers[proxy_auth_hdr] = headers[proxy_auth_hdr]
+ # Proxy-Authorization should not be sent to origin
+ # server.
+ del headers[proxy_auth_hdr]
+ h._set_tunnel(req._tunnel_host, headers=tunnel_headers)
try:
h.request(req.get_method(), req.selector, req.data, headers)