self.assertTrue(response.closed)
self.assertTrue(conn.sock.file_closed)
+ def test_chunked_extension(self):
+ extra = '3;foo=bar\r\n' + 'abc\r\n'
+ expected = chunked_expected + b'abc'
+
+ sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ resp.close()
+
+ def test_chunked_missing_end(self):
+ """some servers may serve up a short chunked encoding stream"""
+ expected = chunked_expected
+ sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ resp.close()
+
+ def test_chunked_trailers(self):
+ """See that trailers are read and ignored"""
+ expected = chunked_expected
+ sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # we should have reached the end of the file
+ self.assertEqual(sock.file.read(100), b"") #we read to the end
+ resp.close()
+
+ def test_chunked_sync(self):
+ """Check that we don't read past the end of the chunked-encoding stream"""
+ expected = chunked_expected
+ extradata = "extradata"
+ sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # the file should now have our extradata ready to be read
+ self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end
+ resp.close()
+
+ def test_content_length_sync(self):
+ """Check that we don't read past the end of the Content-Length stream"""
+ extradata = "extradata"
+ expected = b"Hello123\r\n"
+ sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # the file should now have our extradata ready to be read
+ self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end
+ resp.close()
+
+class ExtendedReadTest(TestCase):
+ """
+ Test peek(), read1(), readline()
+ """
+ lines = (
+ 'HTTP/1.1 200 OK\r\n'
+ '\r\n'
+ 'hello world!\n'
+ 'and now \n'
+ 'for something completely different\n'
+ 'foo'
+ )
+ lines_expected = lines[lines.find('hello'):].encode("ascii")
+ lines_chunked = (
+ 'HTTP/1.1 200 OK\r\n'
+ 'Transfer-Encoding: chunked\r\n\r\n'
+ 'a\r\n'
+ 'hello worl\r\n'
+ '3\r\n'
+ 'd!\n\r\n'
+ '9\r\n'
+ 'and now \n\r\n'
+ '23\r\n'
+ 'for something completely different\n\r\n'
+ '3\r\n'
+ 'foo\r\n'
+ '0\r\n' # terminating chunk
+ '\r\n' # end of trailers
+ )
+
+ def setUp(self):
+ sock = FakeSocket(self.lines)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ resp.fp = io.BufferedReader(resp.fp)
+ self.resp = resp
+
+
+
+ def test_peek(self):
+ resp = self.resp
+ # patch up the buffered peek so that it returns not too much stuff
+ oldpeek = resp.fp.peek
+ def mypeek(n=-1):
+ p = oldpeek(n)
+ if n >= 0:
+ return p[:n]
+ return p[:10]
+ resp.fp.peek = mypeek
+
+ all = []
+ while True:
+ # try a short peek
+ p = resp.peek(3)
+ if p:
+ self.assertGreater(len(p), 0)
+ # then unbounded peek
+ p2 = resp.peek()
+ self.assertGreaterEqual(len(p2), len(p))
+ self.assertTrue(p2.startswith(p))
+ next = resp.read(len(p2))
+ self.assertEqual(next, p2)
+ else:
+ next = resp.read()
+ self.assertFalse(next)
+ all.append(next)
+ if not next:
+ break
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_readline(self):
+ resp = self.resp
+ self._verify_readline(self.resp.readline, self.lines_expected)
+
+ def _verify_readline(self, readline, expected):
+ all = []
+ while True:
+ # short readlines
+ line = readline(5)
+ if line and line != b"foo":
+ if len(line) < 5:
+ self.assertTrue(line.endswith(b"\n"))
+ all.append(line)
+ if not line:
+ break
+ self.assertEqual(b"".join(all), expected)
+
+ def test_read1(self):
+ resp = self.resp
+ def r():
+ res = resp.read1(4)
+ self.assertLessEqual(len(res), 4)
+ return res
+ readliner = Readliner(r)
+ self._verify_readline(readliner.readline, self.lines_expected)
+
+ def test_read1_unbounded(self):
+ resp = self.resp
+ all = []
+ while True:
+ data = resp.read1()
+ if not data:
+ break
+ all.append(data)
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_read1_bounded(self):
+ resp = self.resp
+ all = []
+ while True:
+ data = resp.read1(10)
+ if not data:
+ break
+ self.assertLessEqual(len(data), 10)
+ all.append(data)
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_read1_0(self):
+ self.assertEqual(self.resp.read1(0), b"")
+
+ def test_peek_0(self):
+ p = self.resp.peek(0)
+ self.assertLessEqual(0, len(p))
+
+class ExtendedReadTestChunked(ExtendedReadTest):
+ """
+ Test peek(), read1(), readline() in chunked mode
+ """
+ lines = (
+ 'HTTP/1.1 200 OK\r\n'
+ 'Transfer-Encoding: chunked\r\n\r\n'
+ 'a\r\n'
+ 'hello worl\r\n'
+ '3\r\n'
+ 'd!\n\r\n'
+ '9\r\n'
+ 'and now \n\r\n'
+ '23\r\n'
+ 'for something completely different\n\r\n'
+ '3\r\n'
+ 'foo\r\n'
+ '0\r\n' # terminating chunk
+ '\r\n' # end of trailers
+ )
+
+
+class Readliner:
+ """
+ a simple readline class that uses an arbitrary read function and buffering
+ """
+ def __init__(self, readfunc):
+ self.readfunc = readfunc
+ self.remainder = b""
+
+ def readline(self, limit):
+ data = []
+ datalen = 0
+ read = self.remainder
+ try:
+ while True:
+ idx = read.find(b'\n')
+ if idx != -1:
+ break
+ if datalen + len(read) >= limit:
+ idx = limit - datalen - 1
+ # read more data
+ data.append(read)
+ read = self.readfunc()
+ if not read:
+ idx = 0 #eof condition
+ break
+ idx += 1
+ data.append(read[:idx])
+ self.remainder = read[idx:]
+ return b"".join(data)
+ except:
+ self.remainder = b"".join(data)
+ raise
+
+
class OfflineTest(TestCase):
+ def test_all(self):
+ # Documented objects defined in the module should be in __all__
+ expected = {"responses"} # White-list documented dict() object
+ # HTTPMessage, parse_headers(), and the HTTP status code constants are
+ # intentionally omitted for simplicity
+ blacklist = {"HTTPMessage", "parse_headers"}
+ for name in dir(client):
+ if name in blacklist:
+ continue
+ module_object = getattr(client, name)
+ if getattr(module_object, "__module__", None) == "http.client":
+ expected.add(name)
+ self.assertCountEqual(client.__all__, expected)
+
def test_responses(self):
self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")