]> granicus.if.org Git - python/commitdiff
add a few test cases for threading module.
authorSkip Montanaro <skip@pobox.com>
Mon, 20 Aug 2001 20:28:48 +0000 (20:28 +0000)
committerSkip Montanaro <skip@pobox.com>
Mon, 20 Aug 2001 20:28:48 +0000 (20:28 +0000)
Lib/test/test_threading.py [new file with mode: 0644]

diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
new file mode 100644 (file)
index 0000000..87262b1
--- /dev/null
@@ -0,0 +1,52 @@
+# Very rudimentary test of threading module
+
+# Create a bunch of threads, let each do some work, wait until all are done
+
+from test_support import verbose
+import random
+import threading
+import time
+
+numtasks = 10
+
+# no more than 3 of the 10 can run at once
+sema = threading.BoundedSemaphore(value=3)
+mutex = threading.RLock()
+running = 0
+
+class TestThread(threading.Thread):
+    def run(self):
+        global running
+        delay = random.random() * numtasks
+        if verbose:
+            print 'task', self.getName(), 'will run for', round(delay, 1), 'sec'
+        sema.acquire()
+        mutex.acquire()
+        running = running + 1
+        if verbose:
+            print running, 'tasks are running'
+        mutex.release()
+        time.sleep(delay)
+        if verbose:
+            print 'task', self.getName(), 'done'
+        mutex.acquire()
+        running = running - 1
+        if verbose:
+            print self.getName(), 'is finished.', running, 'tasks are running'
+        mutex.release()
+        sema.release()
+
+threads = []
+def starttasks():
+    for i in range(numtasks):
+        t = TestThread(name="<thread %d>"%i)
+        threads.append(t)
+        t.start()
+
+starttasks()
+
+print 'waiting for all tasks to complete'
+for t in threads:
+    t.join()
+print 'all tasks done'
+