*filename* is not given, the filename is the output of :func:`tempfile.mktemp`
with a suffix that matches the suffix of the last path component of the input
URL. If *reporthook* is given, it must be a function accepting three numeric
- parameters. It will be called after each chunk of data is read from the
+ parameters: A chunk number, the maximum size chunks are read in and the total size of the download
+ (-1 if unknown). It will be called once at the start and after each chunk of data is read from the
network. *reporthook* is ignored for local URLs.
If the *url* uses the :file:`http:` scheme identifier, the optional *data*
"""Tests urllib.request.urlretrieve using the network."""
@contextlib.contextmanager
- def urlretrieve(self, *args):
+ def urlretrieve(self, *args, **kwargs):
resource = args[0]
with support.transient_internet(resource):
- file_location, info = urllib.request.urlretrieve(*args)
+ file_location, info = urllib.request.urlretrieve(*args, **kwargs)
try:
yield file_location, info
finally:
self.assertIsInstance(info, email.message.Message,
"info is not an instance of email.message.Message")
+ logo = "http://www.python.org/community/logos/python-logo-master-v3-TM.png"
+
def test_data_header(self):
- logo = "http://www.python.org/community/logos/python-logo-master-v3-TM.png"
- with self.urlretrieve(logo) as (file_location, fileheaders):
+ with self.urlretrieve(self.logo) as (file_location, fileheaders):
datevalue = fileheaders.get('Date')
dateformat = '%a, %d %b %Y %H:%M:%S GMT'
try:
except ValueError:
self.fail('Date value not in %r format', dateformat)
+ def test_reporthook(self):
+ records = []
+ def recording_reporthook(blocks, block_size, total_size):
+ records.append((blocks, block_size, total_size))
+
+ with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
+ file_location, fileheaders):
+ expected_size = int(fileheaders['Content-Length'])
+
+ records_repr = repr(records) # For use in error messages.
+ self.assertGreater(len(records), 1, msg="There should always be two "
+ "calls; the first one before the transfer starts.")
+ self.assertEqual(records[0][0], 0)
+ self.assertGreater(records[0][1], 0,
+ msg="block size can't be 0 in %s" % records_repr)
+ self.assertEqual(records[0][2], expected_size)
+ self.assertEqual(records[-1][2], expected_size)
+
+ block_sizes = {block_size for _, block_size, _ in records}
+ self.assertEqual({records[0][1]}, block_sizes,
+ msg="block sizes in %s must be equal" % records_repr)
+ self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
+ msg="number of blocks * block size must be"
+ " >= total size in %s" % records_repr)
+
def test_main():
support.requires('network')