self.__response = None
self.__state = _CS_IDLE
self._method = None
+ self._tunnel_host = None
+ self._tunnel_port = None
self._set_hostport(host, port)
if strict is not None:
self.strict = strict
+ def _set_tunnel(self, host, port=None):
+ self._tunnel_host = host
+ self._tunnel_port = port
+
def _set_hostport(self, host, port):
if port is None:
i = host.rfind(':')
def set_debuglevel(self, level):
self.debuglevel = level
+ def _tunnel(self):
+ self._set_hostport(self._tunnel_host, self._tunnel_port)
+ connect_str = "CONNECT %s:%d HTTP/1.0\r\n\r\n" %(self.host, self.port)
+ connect_bytes = connect_str.encode("ascii")
+ self.send(connect_bytes)
+ response = self.response_class(self.sock, strict = self.strict,
+ method= self._method)
+ (version, code, message) = response._read_status()
+ if code != 200:
+ self.close()
+ raise socket.error("Tunnel connection failed: %d %s" % (code,
+ message.strip()))
+ while True:
+ line = response.fp.readline()
+ if line == b'\r\n':
+ break
+
def connect(self):
"""Connect to the host and port specified in __init__."""
self.sock = socket.create_connection((self.host,self.port),
self.timeout)
+ if self._tunnel_host:
+ self._tunnel()
def close(self):
"""Close the connection to the HTTP server."""
sock = socket.create_connection((self.host, self.port),
self.timeout)
+
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
+ def test_proxy_https(self):
+ o = OpenerDirector()
+ ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128"))
+ o.add_handler(ph)
+ meth_spec = [
+ [("https_open", "return response")]
+ ]
+ handlers = add_ordered_mock_handlers(o, meth_spec)
+
+ req = Request("https://www.example.com/")
+ self.assertEqual(req.get_host(), "www.example.com")
+ r = o.open(req)
+ self.assertEqual(req.get_host(), "proxy.example.com:3128")
+ self.assertEqual([(handlers[0], "https_open")],
+ [tup[0:2] for tup in o.calls])
+
+
def test_basic_auth(self, quote_char='"'):
opener = OpenerDirector()
password_manager = MockPasswordManager()
self.full_url = unwrap(url)
self.data = data
self.headers = {}
+ self._tunnel_host = None
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
# End deprecated methods
def set_proxy(self, host, type):
- self.host, self.type = host, type
- self.selector = self.full_url
+ if self.type == 'https' and not self._tunnel_host:
+ self._tunnel_host = self.host
+ else:
+ self.type= type
+ self.selector = self.full_url
+ self.host = host
def has_proxy(self):
return self.selector == self.full_url
req.add_header('Proxy-authorization', 'Basic ' + creds)
hostport = unquote(hostport)
req.set_proxy(hostport, proxy_type)
- if orig_type == proxy_type:
+ if orig_type == proxy_type or orig_type == 'https':
# let other handlers take care of it
return None
else:
# request.
headers["Connection"] = "close"
headers = dict((name.title(), val) for name, val in headers.items())
+
+ if req._tunnel_host:
+ h._set_tunnel(req._tunnel_host)
+
try:
h.request(req.get_method(), req.selector, req.data, headers)
r = h.getresponse() # an HTTPResponse instance
Library
-------
+- Issue #1424152: Fix for http.client, urllib.request to support SSL while
+ working through proxy. Original patch by Christopher Li, changes made by
+ Senthil Kumaran
+
- importlib.abc.PyLoader did not inherit from importlib.abc.ResourceLoader like
the documentation said it did even though the code in PyLoader relied on the
abstract method required by ResourceLoader.