]> granicus.if.org Git - python/commitdiff
Add ccbench to the Tools directory
authorAntoine Pitrou <solipsis@pitrou.net>
Mon, 18 Jan 2010 21:10:31 +0000 (21:10 +0000)
committerAntoine Pitrou <solipsis@pitrou.net>
Mon, 18 Jan 2010 21:10:31 +0000 (21:10 +0000)
Tools/ccbench/ccbench.py [new file with mode: 0644]

diff --git a/Tools/ccbench/ccbench.py b/Tools/ccbench/ccbench.py
new file mode 100644 (file)
index 0000000..0b93012
--- /dev/null
@@ -0,0 +1,462 @@
+# -*- coding: utf-8 -*-
+# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
+
+from __future__ import division
+from __future__ import print_function
+
+"""
+ccbench, a Python concurrency benchmark.
+"""
+
+import time
+import os
+import sys
+import functools
+import itertools
+import threading
+import subprocess
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+import platform
+
+# Compatibility
+try:
+    xrange
+except NameError:
+    xrange = range
+
+try:
+    map = itertools.imap
+except AttributeError:
+    pass
+
+
+THROUGHPUT_DURATION = 2.0
+
+LATENCY_PING_INTERVAL = 0.1
+LATENCY_DURATION = 2.0
+
+
+def task_pidigits():
+    """Pi calculation (Python)"""
+    _map = map
+    _count = itertools.count
+    _islice = itertools.islice
+
+    def calc_ndigits(n):
+        # From http://shootout.alioth.debian.org/
+        def gen_x():
+            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
+
+        def compose(a, b):
+            aq, ar, as_, at = a
+            bq, br, bs, bt = b
+            return (aq * bq,
+                    aq * br + ar * bt,
+                    as_ * bq + at * bs,
+                    as_ * br + at * bt)
+
+        def extract(z, j):
+            q, r, s, t = z
+            return (q*j + r) // (s*j + t)
+
+        def pi_digits():
+            z = (1, 0, 0, 1)
+            x = gen_x()
+            while 1:
+                y = extract(z, 3)
+                while y != extract(z, 4):
+                    z = compose(z, next(x))
+                    y = extract(z, 3)
+                z = compose((10, -10*y, 0, 1), z)
+                yield y
+
+        return list(_islice(pi_digits(), n))
+
+    return calc_ndigits, (50, )
+
+def task_regex():
+    """regular expression (C)"""
+    # XXX this task gives horrendous latency results.
+    import re
+    # Taken from the `inspect` module
+    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
+    with open(__file__, "r") as f:
+        arg = f.read(2000)
+
+    def findall(s):
+        t = time.time()
+        try:
+            return pat.findall(s)
+        finally:
+            print(time.time() - t)
+    return pat.findall, (arg, )
+
+def task_sort():
+    """list sorting (C)"""
+    def list_sort(l):
+        l = l[::-1]
+        l.sort()
+
+    return list_sort, (list(range(1000)), )
+
+def task_compress_zlib():
+    """zlib compression (C)"""
+    import zlib
+    with open(__file__, "rb") as f:
+        arg = f.read(5000) * 3
+
+    def compress(s):
+        zlib.decompress(zlib.compress(s, 5))
+    return compress, (arg, )
+
+def task_compress_bz2():
+    """bz2 compression (C)"""
+    import bz2
+    with open(__file__, "rb") as f:
+        arg = f.read(3000) * 2
+
+    def compress(s):
+        bz2.compress(s)
+    return compress, (arg, )
+
+def task_hashing():
+    """SHA1 hashing (C)"""
+    import hashlib
+    with open(__file__, "rb") as f:
+        arg = f.read(5000) * 30
+
+    def compute(s):
+        hashlib.sha1(s).digest()
+    return compute, (arg, )
+
+
+throughput_tasks = [task_pidigits, task_regex]
+for mod in 'bz2', 'hashlib':
+    try:
+        globals()[mod] = __import__(mod)
+    except ImportError:
+        globals()[mod] = None
+
+# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
+# hashlib if available.
+# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
+if bz2 is not None:
+    throughput_tasks.append(task_compress_bz2)
+elif hashlib is not None:
+    throughput_tasks.append(task_hashing)
+else:
+    throughput_tasks.append(task_compress_zlib)
+
+latency_tasks = throughput_tasks
+
+
+class TimedLoop:
+    def __init__(self, func, args):
+        self.func = func
+        self.args = args
+
+    def __call__(self, start_time, min_duration, end_event, do_yield=False):
+        step = 20
+        niters = 0
+        duration = 0.0
+        _time = time.time
+        _sleep = time.sleep
+        _func = self.func
+        _args = self.args
+        t1 = start_time
+        while True:
+            for i in range(step):
+                _func(*_args)
+            t2 = _time()
+            # If another thread terminated, the current measurement is invalid
+            # => return the previous one.
+            if end_event:
+                return niters, duration
+            niters += step
+            duration = t2 - start_time
+            if duration >= min_duration:
+                end_event.append(None)
+                return niters, duration
+            if t2 - t1 < 0.01:
+                # Minimize interference of measurement on overall runtime
+                step = step * 3 // 2
+            elif do_yield:
+                # OS scheduling of Python threads is sometimes so bad that we
+                # have to force thread switching ourselves, otherwise we get
+                # completely useless results.
+                _sleep(0.0001)
+            t1 = t2
+
+
+def run_throughput_test(func, args, nthreads):
+    assert nthreads >= 1
+
+    # Warm up
+    func(*args)
+
+    results = []
+    loop = TimedLoop(func, args)
+    end_event = []
+
+    if nthreads == 1:
+        # Pure single-threaded performance, without any switching or
+        # synchronization overhead.
+        start_time = time.time()
+        results.append(loop(start_time, THROUGHPUT_DURATION,
+                            end_event, do_yield=False))
+        return results
+
+    started = False
+    ready_cond = threading.Condition()
+    start_cond = threading.Condition()
+    ready = []
+
+    def run():
+        with ready_cond:
+            ready.append(None)
+            ready_cond.notify()
+        with start_cond:
+            while not started:
+                start_cond.wait()
+        results.append(loop(start_time, THROUGHPUT_DURATION,
+                            end_event, do_yield=True))
+
+    threads = []
+    for i in range(nthreads):
+        threads.append(threading.Thread(target=run))
+    for t in threads:
+        t.setDaemon(True)
+        t.start()
+    # We don't want measurements to include thread startup overhead,
+    # so we arrange for timing to start after all threads are ready.
+    with ready_cond:
+        while len(ready) < nthreads:
+            ready_cond.wait()
+    with start_cond:
+        start_time = time.time()
+        started = True
+        start_cond.notify(nthreads)
+    for t in threads:
+        t.join()
+
+    return results
+
+def run_throughput_tests(max_threads):
+    for task in throughput_tasks:
+        print(task.__doc__)
+        print()
+        func, args = task()
+        nthreads = 1
+        baseline_speed = None
+        while nthreads <= max_threads:
+            results = run_throughput_test(func, args, nthreads)
+            # Taking the max duration rather than average gives pessimistic
+            # results rather than optimistic.
+            speed = sum(r[0] for r in results) / max(r[1] for r in results)
+            print("threads=%d: %d" % (nthreads, speed), end="")
+            if baseline_speed is None:
+                print(" iterations/s.")
+                baseline_speed = speed
+            else:
+                print(" ( %d %%)" % (speed / baseline_speed * 100))
+            nthreads += 1
+        print()
+
+
+LAT_END = "END"
+
+def _sendto(sock, s, addr):
+    sock.sendto(s.encode('ascii'), addr)
+
+def _recv(sock, n):
+    return sock.recv(n).decode('ascii')
+
+def latency_client(addr, nb_pings, interval):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    _time = time.time
+    _sleep = time.sleep
+    def _ping():
+        _sendto(sock, "%r\n" % _time(), addr)
+    # The first ping signals the parent process that we are ready.
+    _ping()
+    # We give the parent a bit of time to notice.
+    _sleep(1.0)
+    for i in range(nb_pings):
+        _sleep(interval)
+        _ping()
+    _sendto(sock, LAT_END + "\n", addr)
+
+def run_latency_client(**kwargs):
+    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
+    cmd_line.extend(['--latclient', repr(kwargs)])
+    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
+                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def run_latency_test(func, args, nthreads):
+    # Create a listening socket to receive the pings. We use UDP which should
+    # be painlessly cross-platform.
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.bind(("127.0.0.1", 0))
+    addr = sock.getsockname()
+
+    interval = LATENCY_PING_INTERVAL
+    duration = LATENCY_DURATION
+    nb_pings = int(duration / interval)
+
+    results = []
+    threads = []
+    end_event = []
+    start_cond = threading.Condition()
+    started = False
+    if nthreads > 0:
+        # Warm up
+        func(*args)
+
+        results = []
+        loop = TimedLoop(func, args)
+        ready = []
+        ready_cond = threading.Condition()
+
+        def run():
+            with ready_cond:
+                ready.append(None)
+                ready_cond.notify()
+            with start_cond:
+                while not started:
+                    start_cond.wait()
+            loop(start_time, duration * 1.5, end_event, do_yield=False)
+
+        for i in range(nthreads):
+            threads.append(threading.Thread(target=run))
+        for t in threads:
+            t.setDaemon(True)
+            t.start()
+        # Wait for threads to be ready
+        with ready_cond:
+            while len(ready) < nthreads:
+                ready_cond.wait()
+
+    # Run the client and wait for the first ping(s) to arrive before
+    # unblocking the background threads.
+    chunks = []
+    process = run_latency_client(addr=sock.getsockname(),
+                                 nb_pings=nb_pings, interval=interval)
+    s = _recv(sock, 4096)
+    _time = time.time
+
+    with start_cond:
+        start_time = _time()
+        started = True
+        start_cond.notify(nthreads)
+
+    while LAT_END not in s:
+        s = _recv(sock, 4096)
+        t = _time()
+        chunks.append((t, s))
+
+    # Tell the background threads to stop.
+    end_event.append(None)
+    for t in threads:
+        t.join()
+    process.wait()
+
+    for recv_time, chunk in chunks:
+        # NOTE: it is assumed that a line sent by a client wasn't received
+        # in two chunks because the lines are very small.
+        for line in chunk.splitlines():
+            line = line.strip()
+            if line and line != LAT_END:
+                send_time = eval(line)
+                assert isinstance(send_time, float)
+                results.append((send_time, recv_time))
+
+    return results
+
+def run_latency_tests(max_threads):
+    for task in latency_tasks:
+        print("Background CPU task:", task.__doc__)
+        print()
+        func, args = task()
+        nthreads = 0
+        while nthreads <= max_threads:
+            results = run_latency_test(func, args, nthreads)
+            n = len(results)
+            # We print out milliseconds
+            lats = [1000 * (t2 - t1) for (t1, t2) in results]
+            #print(list(map(int, lats)))
+            avg = sum(lats) / n
+            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
+            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
+            print()
+            #print("    [... from %d samples]" % n)
+            nthreads += 1
+        print()
+
+
+def main():
+    usage = "usage: %prog [-h|--help] [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-t", "--throughput",
+                      action="store_true", dest="throughput", default=False,
+                      help="run throughput tests")
+    parser.add_option("-l", "--latency",
+                      action="store_true", dest="latency", default=False,
+                      help="run latency tests")
+    parser.add_option("-i", "--interval",
+                      action="store", type="int", dest="check_interval", default=None,
+                      help="sys.setcheckinterval() value")
+    parser.add_option("-I", "--switch-interval",
+                      action="store", type="float", dest="switch_interval", default=None,
+                      help="sys.setswitchinterval() value")
+    parser.add_option("-n", "--num-threads",
+                      action="store", type="int", dest="nthreads", default=4,
+                      help="max number of threads in tests")
+
+    # Hidden option to run the pinging client
+    parser.add_option("", "--latclient",
+                      action="store", dest="latclient", default=None,
+                      help=SUPPRESS_HELP)
+
+    options, args = parser.parse_args()
+    if args:
+        parser.error("unexpected arguments")
+
+    if options.latclient:
+        kwargs = eval(options.latclient)
+        latency_client(**kwargs)
+        return
+
+    if not options.throughput and not options.latency:
+        options.throughput = options.latency = True
+    if options.check_interval:
+        sys.setcheckinterval(options.check_interval)
+    if options.switch_interval:
+        sys.setswitchinterval(options.switch_interval)
+
+    print("== %s %s (%s) ==" % (
+        platform.python_implementation(),
+        platform.python_version(),
+        platform.python_build()[0],
+    ))
+    # Processor identification often has repeated spaces
+    cpu = ' '.join(platform.processor().split())
+    print("== %s %s on '%s' ==" % (
+        platform.machine(),
+        platform.system(),
+        cpu,
+    ))
+    print()
+
+    if options.throughput:
+        print("--- Throughput ---")
+        print()
+        run_throughput_tests(options.nthreads)
+
+    if options.latency:
+        print("--- Latency ---")
+        print()
+        run_latency_tests(options.nthreads)
+
+if __name__ == "__main__":
+    main()