]> granicus.if.org Git - python/commitdiff
Brett Cannon's dummy_thread and dummy_threading modules (SF patch
authorGuido van Rossum <guido@python.org>
Mon, 30 Dec 2002 22:30:22 +0000 (22:30 +0000)
committerGuido van Rossum <guido@python.org>
Mon, 30 Dec 2002 22:30:22 +0000 (22:30 +0000)
622537), with some nitpicking editorial changes.

Doc/lib/libdummythread.tex [new file with mode: 0644]
Doc/lib/libdummythreading.tex [new file with mode: 0644]
Lib/dummy_thread.py [new file with mode: 0644]
Lib/dummy_threading.py [new file with mode: 0644]
Lib/test/test_dummy_thread.py [new file with mode: 0644]
Lib/test/test_dummy_threading.py [new file with mode: 0644]

diff --git a/Doc/lib/libdummythread.tex b/Doc/lib/libdummythread.tex
new file mode 100644 (file)
index 0000000..56e3a03
--- /dev/null
@@ -0,0 +1,22 @@
+\section{\module{dummy_thread} ---
+         Drop-in replacement for the \module{thread} module}
+
+\declaremodule[dummythread]{standard}{dummy_thread}
+\modulesynopsis{Drop-in replacement for the thread module.}
+
+This module provides a duplicate interface to the \refmodule{thread} module. It
+is meant to be imported when the \module{thread} module is not provided on a
+platform.
+
+Suggested usage is:
+
+\begin{verbatim}
+try:
+    import thread as _thread
+except ImportError:
+    import dummy_thread as _thread
+\end{verbatim}
+
+Be careful to not use this module where deadlock might occur from a thread 
+being created that blocks waiting for another thread to be created.  This 
+often occurs with blocking I/O.
diff --git a/Doc/lib/libdummythreading.tex b/Doc/lib/libdummythreading.tex
new file mode 100644 (file)
index 0000000..7eb7076
--- /dev/null
@@ -0,0 +1,22 @@
+\section{\module{dummy_thread} ---
+         Drop-in replacement for the \module{threading} module}
+
+\declaremodule[dummythreading]{standard}{dummy_threading}
+\modulesynopsis{Drop-in replacement for the threading module.}
+
+This module provides a duplicate interface to the \refmodule{threading} module. It
+is meant to be imported when the \module{threading} module is not provided on a
+platform.
+
+Suggested usage is:
+
+\begin{verbatim}
+try:
+    import threading as _threading
+except ImportError:
+    import dummy_threading as _threading
+\end{verbatim}
+
+Be careful to not use this module where deadlock might occur from a thread 
+being created that blocks waiting for another thread to be created.  This 
+often occurs with blocking I/O.
diff --git a/Lib/dummy_thread.py b/Lib/dummy_thread.py
new file mode 100644 (file)
index 0000000..b0ba0ce
--- /dev/null
@@ -0,0 +1,116 @@
+"""Drop-in replacement for the thread module.
+
+Meant to be used as a brain-dead substitute so that threaded code does
+not need to be rewritten for when the thread module is not present.
+
+Suggested usage is::
+    
+    try:
+        import thread
+    except ImportError:
+        import dummy_thread as thread
+
+"""
+__author__ = "Brett Cannon"
+__email__ = "brett@python.org"
+
+# Exports only things specified by thread documentation
+# (skipping obsolete synonyms allocate(), start_new(), exit_thread())
+__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
+           'LockType']
+
+import traceback as _traceback
+
+class error(Exception):
+    """Dummy implementation of thread.error."""
+
+    def __init__(self, *args):
+        self.args = args
+
+def start_new_thread(function, args, kwargs={}):
+    """Dummy implementation of thread.start_new_thread().
+
+    Compatibility is maintained by making sure that ``args`` is a
+    tuple and ``kwargs`` is a dictionary.  If an exception is raised
+    and it is SystemExit (which can be done by thread.exit()) it is
+    caught and nothing is done; all other exceptions are printed out
+    by using traceback.print_exc().
+
+    """
+    if type(args) != type(tuple()):
+        raise TypeError("2nd arg must be a tuple")
+    if type(kwargs) != type(dict()):
+        raise TypeError("3rd arg must be a dict")
+    try:
+        function(*args, **kwargs)
+    except SystemExit:
+        pass
+    except:
+        _traceback.print_exc()
+
+def exit():
+    """Dummy implementation of thread.exit()."""
+    raise SystemExit
+
+def get_ident():
+    """Dummy implementation of thread.get_ident().
+
+    Since this module should only be used when threadmodule is not
+    available, it is safe to assume that the current process is the
+    only thread.  Thus a constant can be safely returned.
+    """
+    return -1
+
+def allocate_lock():
+    """Dummy implementation of thread.allocate_lock()."""
+    return LockType()
+
+class LockType(object):
+    """Class implementing dummy implementation of thread.LockType.
+    
+    Compatibility is maintained by maintaining self.locked_status
+    which is a boolean that stores the state of the lock.  Pickling of
+    the lock, though, should not be done since if the thread module is
+    then used with an unpickled ``lock()`` from here problems could
+    occur from this class not having atomic methods.
+
+    """
+
+    def __init__(self):
+        self.locked_status = False
+    
+    def acquire(self, waitflag=None):
+        """Dummy implementation of acquire().
+
+        For blocking calls, self.locked_status is automatically set to
+        True and returned appropriately based on value of
+        ``waitflag``.  If it is non-blocking, then the value is
+        actually checked and not set if it is already acquired.  This
+        is all done so that threading.Condition's assert statements
+        aren't triggered and throw a little fit.
+
+        """
+        if waitflag is None:
+            self.locked_status = True
+            return None 
+        elif not waitflag:
+            if not self.locked_status:
+                self.locked_status = True
+                return True
+            else:
+                return False
+        else:
+            self.locked_status = True
+            return True 
+
+    def release(self):
+        """Release the dummy lock."""
+        # XXX Perhaps shouldn't actually bother to test?  Could lead
+        #     to problems for complex, threaded code.
+        if not self.locked_status:
+            raise error
+        self.locked_status = False
+        return True
+
+    def locked(self):
+        return self.locked_status
diff --git a/Lib/dummy_threading.py b/Lib/dummy_threading.py
new file mode 100644 (file)
index 0000000..2e070aa
--- /dev/null
@@ -0,0 +1,63 @@
+"""Faux ``threading`` version using ``dummy_thread`` instead of ``thread``.
+
+The module ``_dummy_threading`` is added to ``sys.modules`` in order
+to not have ``threading`` considered imported.  Had ``threading`` been
+directly imported it would have made all subsequent imports succeed
+regardless of whether ``thread`` was available which is not desired.
+
+:Author: Brett Cannon
+:Contact: brett@python.org
+
+XXX: Try to get rid of ``_dummy_threading``.
+
+"""
+from sys import modules as sys_modules
+
+import dummy_thread
+
+# Declaring now so as to not have to nest ``try``s to get proper clean-up.
+holding_thread = False
+holding_threading = False
+
+try:
+    # Could have checked if ``thread`` was not in sys.modules and gone
+    # a different route, but decided to mirror technique used with
+    # ``threading`` below.
+    if 'thread' in sys_modules:
+        held_thread = sys_modules['thread']
+        holding_thread = True
+    # Must have some module named ``thread`` that implements its API
+    # in order to initially import ``threading``.
+    sys_modules['thread'] = sys_modules['dummy_thread']
+
+    if 'threading' in sys_modules:
+        # If ``threading`` is already imported, might as well prevent
+        # trying to import it more than needed by saving it if it is
+        # already imported before deleting it.
+        held_threading = sys_modules['threading']
+        holding_threading = True
+        del sys_modules['threading']
+    import threading
+    # Need a copy of the code kept somewhere...
+    sys_modules['_dummy_threading'] = sys_modules['threading']
+    del sys_modules['threading']
+    from _dummy_threading import *
+    from _dummy_threading import __all__
+
+finally:
+    # Put back ``threading`` if we overwrote earlier
+    if holding_threading:
+        sys_modules['threading'] = held_threading
+        del held_threading
+    del holding_threading
+
+    # Put back ``thread`` if we overwrote, else del the entry we made
+    if holding_thread:
+        sys_modules['thread'] = held_thread
+        del held_thread
+    else:
+        del sys_modules['thread']
+    del holding_thread
+
+    del dummy_thread
+    del sys_modules
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
new file mode 100644 (file)
index 0000000..3be3931
--- /dev/null
@@ -0,0 +1,167 @@
+"""Generic thread tests.
+
+Meant to be used by dummy_thread and thread.  To allow for different modules
+to be used, test_main() can be called with the module to use as the thread
+implementation as its sole argument.
+
+"""
+import dummy_thread as _thread
+import time
+import Queue
+import random
+import unittest
+from test import test_support
+
+
+class LockTests(unittest.TestCase):
+    """Test lock objects."""
+
+    def setUp(self):
+        # Create a lock
+        self.lock = _thread.allocate_lock()
+
+    def test_initlock(self):
+        #Make sure locks start locked
+        self.failUnless(not self.lock.locked(),
+                        "Lock object is not initialized unlocked.")
+
+    def test_release(self):
+        # Test self.lock.release()
+        self.lock.acquire()
+        self.lock.release()
+        self.failUnless(not self.lock.locked(),
+                        "Lock object did not release properly.")
+    
+    def test_improper_release(self):
+        #Make sure release of an unlocked thread raises _thread.error
+        self.failUnlessRaises(_thread.error, self.lock.release)
+
+    def test_cond_acquire_success(self):
+        #Make sure the conditional acquiring of the lock works.
+        self.failUnless(self.lock.acquire(0),
+                        "Conditional acquiring of the lock failed.")
+
+    def test_cond_acquire_fail(self):
+        #Test acquiring locked lock returns False
+        self.lock.acquire(0)
+        self.failUnless(not self.lock.acquire(0),
+                        "Conditional acquiring of a locked lock incorrectly "
+                         "succeeded.")
+
+    def test_uncond_acquire_success(self):
+        #Make sure unconditional acquiring of a lock works.
+        self.lock.acquire()
+        self.failUnless(self.lock.locked(),
+                        "Uncondional locking failed.")
+
+    def test_uncond_acquire_return_val(self):
+        #Make sure that an unconditional locking returns True.
+        self.failUnless(self.lock.acquire(1) is True,
+                        "Unconditional locking did not return True.")
+    
+    def test_uncond_acquire_blocking(self):
+        #Make sure that unconditional acquiring of a locked lock blocks.
+        def delay_unlock(to_unlock, delay):
+            """Hold on to lock for a set amount of time before unlocking."""
+            time.sleep(delay)
+            to_unlock.release()
+
+        self.lock.acquire()
+        delay = 1  #In seconds
+        start_time = int(time.time())
+        _thread.start_new_thread(delay_unlock,(self.lock, delay))
+        if test_support.verbose:
+            print
+            print "*** Waiting for thread to release the lock "\
+            "(approx. %s sec.) ***" % delay
+        self.lock.acquire()
+        end_time = int(time.time())
+        if test_support.verbose:
+            print "done"
+        self.failUnless((end_time - start_time) >= delay,
+                        "Blocking by unconditional acquiring failed.")
+
+class MiscTests(unittest.TestCase):
+    """Miscellaneous tests."""
+
+    def test_exit(self):
+        #Make sure _thread.exit() raises SystemExit
+        self.failUnlessRaises(SystemExit, _thread.exit)
+
+    def test_ident(self):
+        #Test sanity of _thread.get_ident()
+        self.failUnless(isinstance(_thread.get_ident(), int),
+                        "_thread.get_ident() returned a non-integer")
+        self.failUnless(_thread.get_ident() != 0,
+                        "_thread.get_ident() returned 0")
+
+    def test_LockType(self):
+        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
+        self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
+                        "_thread.LockType is not an instance of what is "
+                         "returned by _thread.allocate_lock()")
+
+class ThreadTests(unittest.TestCase):
+    """Test thread creation."""
+
+    def test_arg_passing(self):
+        #Make sure that parameter passing works.
+        def arg_tester(queue, arg1=False, arg2=False):
+            """Use to test _thread.start_new_thread() passes args properly."""
+            queue.put((arg1, arg2))
+
+        testing_queue = Queue.Queue(1)
+        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using tuple failed")
+        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
+                                                       'arg1':True, 'arg2':True})
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using kwargs failed")
+        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using both tuple"
+                        " and kwargs failed")
+    
+    def test_multi_creation(self):
+        #Make sure multiple threads can be created.
+        def queue_mark(queue, delay):
+            """Wait for ``delay`` seconds and then put something into ``queue``"""
+            time.sleep(delay)
+            queue.put(_thread.get_ident())
+        
+        thread_count = 5
+        delay = 1.5
+        testing_queue = Queue.Queue(thread_count)
+        if test_support.verbose:
+            print
+            print "*** Testing multiple thread creation "\
+            "(will take approx. %s to %s sec.) ***" % (delay, thread_count)
+        for count in xrange(thread_count):
+            _thread.start_new_thread(queue_mark,
+                                     (testing_queue, round(random.random(), 1)))
+        time.sleep(delay)
+        if test_support.verbose:
+            print 'done'
+        self.failUnless(testing_queue.qsize() == thread_count,
+                        "Not all %s threads executed properly after %s sec." % 
+                        (thread_count, delay))
+
+def test_main(imported_module=None):
+    global _thread
+    if imported_module:
+        _thread = imported_module
+    if test_support.verbose:
+        print
+        print "*** Using %s as _thread module ***" % _thread
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(LockTests))
+    suite.addTest(unittest.makeSuite(MiscTests))
+    suite.addTest(unittest.makeSuite(ThreadTests))
+    test_support.run_suite(suite)
+
+if __name__ == '__main__':
+    test_main()
diff --git a/Lib/test/test_dummy_threading.py b/Lib/test/test_dummy_threading.py
new file mode 100644 (file)
index 0000000..cc8c0e3
--- /dev/null
@@ -0,0 +1,70 @@
+# Very rudimentary test of threading module
+
+# Create a bunch of threads, let each do some work, wait until all are done
+
+from test.test_support import verbose
+import random
+import dummy_threading as _threading
+import time
+
+
+class TestThread(_threading.Thread):
+    
+    def run(self):
+        global running
+        delay = random.random() * 2
+        if verbose:
+            print 'task', self.getName(), 'will run for', delay, 'sec'
+        sema.acquire()
+        mutex.acquire()
+        running = running + 1
+        if verbose:
+            print running, 'tasks are running'
+        mutex.release()
+        time.sleep(delay)
+        if verbose:
+            print 'task', self.getName(), 'done'
+        mutex.acquire()
+        running = running - 1
+        if verbose:
+            print self.getName(), 'is finished.', running, 'tasks are running'
+        mutex.release()
+        sema.release()
+
+def starttasks():
+    for i in range(numtasks):
+        t = TestThread(name="<thread %d>"%i)
+        threads.append(t)
+        t.start()
+
+
+def test_main():
+    # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
+    # about 1 second per clump).
+    global numtasks
+    numtasks = 10
+
+    # no more than 3 of the 10 can run at once
+    global sema
+    sema = _threading.BoundedSemaphore(value=3)
+    global mutex
+    mutex = _threading.RLock()
+    global running
+    running = 0
+
+    global threads
+    threads = []
+    
+    starttasks()
+
+    if verbose:
+        print 'waiting for all tasks to complete'
+    for t in threads:
+        t.join()
+    if verbose:
+        print 'all tasks done'
+
+
+
+if __name__ == '__main__':
+    test_main()