Req-sent-unread-response _CS_REQ_SENT <response_class>
"""
-import io
-import socket
import email.parser
import email.message
+import io
+import os
+import socket
from urllib.parse import urlsplit
import warnings
# ignore the error... the caller will know if they can retry.
if self.debuglevel > 0:
print("send:", repr(str))
- try:
- blocksize = 8192
- if hasattr(str, "read") :
+ blocksize = 8192
+ if hasattr(str, "read") :
+ if self.debuglevel > 0:
+ print("sendIng a read()able")
+ encode = False
+ if "b" not in str.mode:
+ encode = True
if self.debuglevel > 0:
- print("sendIng a read()able")
- encode = False
- if "b" not in str.mode:
- encode = True
- if self.debuglevel > 0:
- print("encoding file using iso-8859-1")
- while 1:
- data = str.read(blocksize)
- if not data:
- break
- if encode:
- data = data.encode("iso-8859-1")
- self.sock.sendall(data)
- else:
- self.sock.sendall(str)
- except socket.error as v:
- if v.args[0] == 32: # Broken pipe
- self.close()
- raise
+ print("encoding file using iso-8859-1")
+ while 1:
+ data = str.read(blocksize)
+ if not data:
+ break
+ if encode:
+ data = data.encode("iso-8859-1")
+ self.sock.sendall(data)
+ else:
+ self.sock.sendall(str)
def _output(self, s):
"""Add a line of output to the current request buffer.
def request(self, method, url, body=None, headers={}):
"""Send a complete request to the server."""
- try:
- self._send_request(method, url, body, headers)
- except socket.error as v:
- # trap 'Broken pipe' if we're allowed to automatically reconnect
- if v.args[0] != 32 or not self.auto_open:
- raise
- # try one more time
- self._send_request(method, url, body, headers)
+ self._send_request(method, url, body, headers)
def _set_content_length(self, body):
# Set the content-length based on the body.
except TypeError as te:
# If this is a file-like object, try to
# fstat its file descriptor
- import os
try:
thelen = str(os.fstat(body.fileno()).st_size)
except (AttributeError, OSError):
self.putheader('Content-Length', thelen)
def _send_request(self, method, url, body, headers):
- # honour explicitly requested Host: and Accept-Encoding headers
+ # Honor explicitly requested Host: and Accept-Encoding: headers.
header_names = dict.fromkeys([k.lower() for k in headers])
skips = {}
if 'host' in header_names:
def connect(self):
"Connect to a host on a given (SSL) port."
- sock = socket.create_connection((self.host, self.port), self.timeout)
+ sock = socket.create_connection((self.host, self.port),
+ self.timeout)
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
+import errno
from http import client
import io
import socket
raise client.UnimplementedFileMode()
return self.fileclass(self.text)
+class EPipeSocket(FakeSocket):
+
+ def __init__(self, text, pipe_trigger):
+ # When sendall() is called with pipe_trigger, raise EPIPE.
+ FakeSocket.__init__(self, text)
+ self.pipe_trigger = pipe_trigger
+
+ def sendall(self, data):
+ if self.pipe_trigger in data:
+ raise socket.error(errno.EPIPE, "gotcha")
+ self.data += data
+
+ def close(self):
+ pass
+
class NoEOFStringIO(io.BytesIO):
"""Like StringIO, but raises AssertionError on EOF.
finally:
resp.close()
+ def test_epipe(self):
+ sock = EPipeSocket(
+ "HTTP/1.0 401 Authorization Required\r\n"
+ "Content-type: text/html\r\n"
+ "WWW-Authenticate: Basic realm=\"example\"\r\n",
+ b"Content-Length")
+ conn = client.HTTPConnection("example.com")
+ conn.sock = sock
+ self.assertRaises(socket.error,
+ lambda: conn.request("PUT", "/url", "body"))
+ resp = conn.getresponse()
+ self.assertEqual(401, resp.status)
+ self.assertEqual("Basic realm=\"example\"",
+ resp.getheader("www-authenticate"))
class OfflineTest(TestCase):
def test_responses(self):
def setUp(self):
self.conn = client.HTTPConnection('example.com')
- self.sock = FakeSocket("")
+ self.conn.sock = self.sock = FakeSocket("")
self.conn.sock = self.sock
def get_headers_and_fp(self):