yield element
+.. function:: itertools.chain.from_iterable(iterable)
+
+ Alternate constructor for :func:`chain`. Gets chained inputs from a
+ single iterable argument that is evaluated lazily. Equivalent to::
+
+ @classmethod
+ def from_iterable(iterables):
+ for it in iterables:
+ for element in it:
+ yield element
+
+ .. versionadded:: 2.6
+
.. function:: combinations(iterable, r)
Return successive *r* length combinations of elements in the *iterable*.
- Combinations are emitted in a lexicographic sort order. So, if the
+ Combinations are emitted in lexicographic sort order. So, if the
input *iterable* is sorted, the combination tuples will be produced
in sorted order.
Elements are treated as unique based on their position, not on their
value. So if the input elements are unique, there will be no repeat
- values within a single combination.
+ values in each combination.
Each result tuple is ordered to match the input order. So, every
combination is a subsequence of the input *iterable*.
example :func:`islice` or :func:`takewhile`).
+.. function:: permutations(iterable[, r])
+
+ Return successive *r* length permutations of elements in the *iterable*.
+
+ If *r* is not specified or is ``None``, then *r* defaults to the length
+ of the *iterable* and all possible full-length permutations
+ are generated.
+
+ Permutations are emitted in lexicographic sort order. So, if the
+ input *iterable* is sorted, the permutation tuples will be produced
+ in sorted order.
+
+ Elements are treated as unique based on their position, not on their
+ value. So if the input elements are unique, there will be no repeat
+ values in each permutation.
+
+ Example: ``permutations(range(3),2) --> (1,2) (1,3) (2,1) (2,3) (3,1) (3,2)``
+
+ .. versionadded:: 2.6
+
.. function:: product(*iterables[, repeat])
Cartesian product of input iterables.
def ncycles(seq, n):
"Returns the sequence elements n times"
- return chain(*repeat(seq, n))
+ return chain.from_iterable(repeat(seq, n))
def dotproduct(vec1, vec2):
return sum(imap(operator.mul, vec1, vec2))
def flatten(listOfLists):
- return list(chain(*listOfLists))
+ return list(chain.from_iterable(listOfLists))
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
"""
if times is None:
return starmap(func, repeat(args))
- else:
- return starmap(func, repeat(args, times))
+ return starmap(func, repeat(args, times))
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
def roundrobin(*iterables):
"roundrobin('abc', 'd', 'ef') --> 'a', 'd', 'e', 'b', 'f', 'c'"
- # Recipe contributed by George Sakkis
+ # Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).next for it in iterables)
while pending:
nexts = cycle(islice(nexts, pending))
def powerset(iterable):
- "powerset('ab') --> set([]), set(['b']), set(['a']), set(['a', 'b'])"
- skip = object()
- for t in product(*izip(repeat(skip), iterable)):
- yield set(e for e in t if e is not skip)
+ "powerset('ab') --> set([]), set(['a']), set(['b']), set(['a', 'b'])"
+ # Recipe credited to Eric Raymond
+ pairs = [(2**i, x) for i, x in enumerate(iterable)]
+ for n in xrange(2**len(pairs)):
+ yield set(x for m, x in pairs if m&n)
list.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = cgi.escape(urllib.unquote(self.path))
- f.write("<title>Directory listing for %s</title>\n" % displaypath)
- f.write("<h2>Directory listing for %s</h2>\n" % displaypath)
+ f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
+ f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
+ f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
f.write("<hr>\n<ul>\n")
for name in list:
fullname = os.path.join(path, name)
# Note: a link to a directory displays with @ and links with /
f.write('<li><a href="%s">%s</a>\n'
% (urllib.quote(linkname), cgi.escape(displayname)))
- f.write("</ul>\n<hr>\n")
+ f.write("</ul>\n<hr>\n</body>\n</html>\n")
length = f.tell()
f.seek(0)
self.send_response(200)
def collect_children(self):
"""Internal routine to wait for children that have exited."""
- while self.active_children:
- if len(self.active_children) < self.max_children:
- options = os.WNOHANG
- else:
- # If the maximum number of children are already
- # running, block while waiting for a child to exit
- options = 0
+ if self.active_children is None: return
+ while len(self.active_children) >= self.max_children:
+ # XXX: This will wait for any child process, not just ones
+ # spawned by this library. This could confuse other
+ # libraries that expect to be able to wait for their own
+ # children.
try:
- pid, status = os.waitpid(0, options)
+ pid, status = os.waitpid(0, options=0)
except os.error:
pid = None
- if not pid: break
+ if pid not in self.active_children: continue
+ self.active_children.remove(pid)
+
+ # XXX: This loop runs more system calls than it ought
+ # to. There should be a way to put the active_children into a
+ # process group and then use os.waitpid(-pgid) to wait for any
+ # of that set, but I couldn't find a way to allocate pgids
+ # that couldn't collide.
+ for child in self.active_children:
+ try:
+ pid, status = os.waitpid(child, os.WNOHANG)
+ except os.error:
+ pid = None
+ if not pid: continue
try:
self.active_children.remove(pid)
except ValueError as e:
Test suite for SocketServer.py.
"""
+import contextlib
import errno
import imp
import os
if verbose: print("thread: done")
+@contextlib.contextmanager
+def simple_subprocess(testcase):
+ pid = os.fork()
+ if pid == 0:
+ # Don't throw an exception; it would be caught by the test harness.
+ os._exit(72)
+ yield None
+ pid2, status = os.waitpid(pid, 0)
+ testcase.assertEquals(pid2, pid)
+ testcase.assertEquals(72 << 8, status)
+
+
class SocketServerTest(unittest.TestCase):
"""Test all socket servers."""
self.stream_examine)
if HAVE_FORKING:
- def test_ThreadingTCPServer(self):
- self.run_server(SocketServer.ForkingTCPServer,
- SocketServer.StreamRequestHandler,
- self.stream_examine)
+ def test_ForkingTCPServer(self):
+ with simple_subprocess(self):
+ self.run_server(SocketServer.ForkingTCPServer,
+ SocketServer.StreamRequestHandler,
+ self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
- self.run_server(ForkingUnixStreamServer,
- SocketServer.StreamRequestHandler,
- self.stream_examine)
+ with simple_subprocess(self):
+ self.run_server(ForkingUnixStreamServer,
+ SocketServer.StreamRequestHandler,
+ self.stream_examine)
def test_UDPServer(self):
self.run_server(SocketServer.UDPServer,
if HAVE_FORKING:
def test_ForkingUDPServer(self):
- self.run_server(SocketServer.ForkingUDPServer,
- SocketServer.DatagramRequestHandler,
- self.dgram_examine)
+ with simple_subprocess(self):
+ self.run_server(SocketServer.ForkingUDPServer,
+ SocketServer.DatagramRequestHandler,
+ self.dgram_examine)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
'2.5.1150':'{31800004-6386-4999-a519-518f2d78d8f0}', # 2.5.1
'2.5.2150':'{6304a7da-1132-4e91-a343-a296269eab8a}', # 2.5.2c1
'2.5.2150':'{6b976adf-8ae8-434e-b282-a06c7f624d2f}', # 2.5.2
+ '2.6.101': '{0ba82e1b-52fd-4e03-8610-a6c76238e8a8}', # 2.6a1
+ '2.6.102': '{3b27e16c-56db-4570-a2d3-e9a26180c60b}', # 2.6a2
+ '2.6.103': '{cd06a9c5-bde5-4bd7-9874-48933997122a}', # 2.6a3
+ '2.6.104': '{dc6ed634-474a-4a50-a547-8de4b7491e53}', # 2.6a4
'3.0.101': '{8554263a-3242-4857-9359-aa87bc2c58c2}', # 3.0a1
'3.0.102': '{692d6e2c-f0ac-40b8-a133-7191aeeb67f9}', # 3.0a2
'3.0.103': '{49cb2995-751a-4753-be7a-d0b1bb585e06}', # 3.0a3