]> granicus.if.org Git - python/commitdiff
Set eol-style correctly for mp_distributing.py.
authorGeorg Brandl <georg@python.org>
Sat, 3 Jan 2009 19:10:12 +0000 (19:10 +0000)
committerGeorg Brandl <georg@python.org>
Sat, 3 Jan 2009 19:10:12 +0000 (19:10 +0000)
Doc/includes/mp_distributing.py

index 5ec718bf47d74d4d46dc8a175d7c6258ffb0a3cd..43c7ad13a9feb4c3dceb4950d8702af5b8d94be5 100644 (file)
-#\r
-# Module to allow spawning of processes on foreign host\r
-#\r
-# Depends on `multiprocessing` package -- tested with `processing-0.60`\r
-#\r
-# Copyright (c) 2006-2008, R Oudkerk\r
-# All rights reserved.\r
-#\r
-\r
-__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']\r
-\r
-#\r
-# Imports\r
-#\r
-\r
-import sys\r
-import os\r
-import tarfile\r
-import shutil\r
-import subprocess\r
-import logging\r
-import itertools\r
-import Queue\r
-\r
-try:\r
-    import cPickle as pickle\r
-except ImportError:\r
-    import pickle\r
-\r
-from multiprocessing import Process, current_process, cpu_count\r
-from multiprocessing import util, managers, connection, forking, pool\r
-\r
-#\r
-# Logging\r
-#\r
-\r
-def get_logger():\r
-    return _logger\r
-\r
-_logger = logging.getLogger('distributing')\r
-_logger.propogate = 0\r
-\r
-_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)\r
-_handler = logging.StreamHandler()\r
-_handler.setFormatter(_formatter)\r
-_logger.addHandler(_handler)\r
-\r
-info = _logger.info\r
-debug = _logger.debug\r
-\r
-#\r
-# Get number of cpus\r
-#\r
-\r
-try:\r
-    slot_count = cpu_count()\r
-except NotImplemented:\r
-    slot_count = 1\r
-        \r
-#\r
-# Manager type which spawns subprocesses\r
-#\r
-\r
-class HostManager(managers.SyncManager):\r
-    '''\r
-    Manager type used for spawning processes on a (presumably) foreign host\r
-    '''    \r
-    def __init__(self, address, authkey):\r
-        managers.SyncManager.__init__(self, address, authkey)\r
-        self._name = 'Host-unknown'\r
-\r
-    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):\r
-        if hasattr(sys.modules['__main__'], '__file__'):\r
-            main_path = os.path.basename(sys.modules['__main__'].__file__)\r
-        else:\r
-            main_path = None\r
-        data = pickle.dumps((target, args, kwargs))\r
-        p = self._RemoteProcess(data, main_path)\r
-        if name is None:\r
-            temp = self._name.split('Host-')[-1] + '/Process-%s'\r
-            name = temp % ':'.join(map(str, p.get_identity()))\r
-        p.set_name(name)\r
-        return p\r
-\r
-    @classmethod\r
-    def from_address(cls, address, authkey):\r
-        manager = cls(address, authkey)\r
-        managers.transact(address, authkey, 'dummy')\r
-        manager._state.value = managers.State.STARTED\r
-        manager._name = 'Host-%s:%s' % manager.address\r
-        manager.shutdown = util.Finalize(\r
-            manager, HostManager._finalize_host,\r
-            args=(manager._address, manager._authkey, manager._name),\r
-            exitpriority=-10\r
-            )\r
-        return manager\r
-\r
-    @staticmethod\r
-    def _finalize_host(address, authkey, name):\r
-        managers.transact(address, authkey, 'shutdown')\r
-        \r
-    def __repr__(self):\r
-        return '<Host(%s)>' % self._name\r
-\r
-#\r
-# Process subclass representing a process on (possibly) a remote machine\r
-#\r
-\r
-class RemoteProcess(Process):\r
-    '''\r
-    Represents a process started on a remote host\r
-    '''\r
-    def __init__(self, data, main_path):\r
-        assert not main_path or os.path.basename(main_path) == main_path\r
-        Process.__init__(self)\r
-        self._data = data\r
-        self._main_path = main_path\r
-        \r
-    def _bootstrap(self):\r
-        forking.prepare({'main_path': self._main_path})\r
-        self._target, self._args, self._kwargs = pickle.loads(self._data)\r
-        return Process._bootstrap(self)\r
-        \r
-    def get_identity(self):\r
-        return self._identity\r
-\r
-HostManager.register('_RemoteProcess', RemoteProcess)\r
-\r
-#\r
-# A Pool class that uses a cluster\r
-#\r
-\r
-class DistributedPool(pool.Pool):\r
-    \r
-    def __init__(self, cluster, processes=None, initializer=None, initargs=()):\r
-        self._cluster = cluster\r
-        self.Process = cluster.Process\r
-        pool.Pool.__init__(self, processes or len(cluster),\r
-                           initializer, initargs)\r
-        \r
-    def _setup_queues(self):\r
-        self._inqueue = self._cluster._SettableQueue()\r
-        self._outqueue = self._cluster._SettableQueue()\r
-        self._quick_put = self._inqueue.put\r
-        self._quick_get = self._outqueue.get\r
-\r
-    @staticmethod\r
-    def _help_stuff_finish(inqueue, task_handler, size):\r
-        inqueue.set_contents([None] * size)\r
-\r
-#\r
-# Manager type which starts host managers on other machines\r
-#\r
-\r
-def LocalProcess(**kwds):\r
-    p = Process(**kwds)\r
-    p.set_name('localhost/' + p.name)\r
-    return p\r
-\r
-class Cluster(managers.SyncManager):\r
-    '''\r
-    Represents collection of slots running on various hosts.\r
-    \r
-    `Cluster` is a subclass of `SyncManager` so it allows creation of\r
-    various types of shared objects.\r
-    '''\r
-    def __init__(self, hostlist, modules):\r
-        managers.SyncManager.__init__(self, address=('localhost', 0))\r
-        self._hostlist = hostlist\r
-        self._modules = modules\r
-        if __name__ not in modules:\r
-            modules.append(__name__)\r
-        files = [sys.modules[name].__file__ for name in modules]\r
-        for i, file in enumerate(files):\r
-            if file.endswith('.pyc') or file.endswith('.pyo'):\r
-                files[i] = file[:-4] + '.py'\r
-        self._files = [os.path.abspath(file) for file in files]\r
-        \r
-    def start(self):\r
-        managers.SyncManager.start(self)\r
-        \r
-        l = connection.Listener(family='AF_INET', authkey=self._authkey)\r
-        \r
-        for i, host in enumerate(self._hostlist):\r
-            host._start_manager(i, self._authkey, l.address, self._files)\r
-\r
-        for host in self._hostlist:\r
-            if host.hostname != 'localhost':\r
-                conn = l.accept()\r
-                i, address, cpus = conn.recv()\r
-                conn.close()\r
-                other_host = self._hostlist[i]\r
-                other_host.manager = HostManager.from_address(address,\r
-                                                              self._authkey)\r
-                other_host.slots = other_host.slots or cpus\r
-                other_host.Process = other_host.manager.Process\r
-            else:\r
-                host.slots = host.slots or slot_count\r
-                host.Process = LocalProcess\r
-\r
-        self._slotlist = [\r
-            Slot(host) for host in self._hostlist for i in range(host.slots)\r
-            ]\r
-        self._slot_iterator = itertools.cycle(self._slotlist)\r
-        self._base_shutdown = self.shutdown\r
-        del self.shutdown\r
-        \r
-    def shutdown(self):\r
-        for host in self._hostlist:\r
-            if host.hostname != 'localhost':\r
-                host.manager.shutdown()\r
-        self._base_shutdown()\r
-        \r
-    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):\r
-        slot = self._slot_iterator.next()\r
-        return slot.Process(\r
-            group=group, target=target, name=name, args=args, kwargs=kwargs\r
-            )\r
-\r
-    def Pool(self, processes=None, initializer=None, initargs=()):\r
-        return DistributedPool(self, processes, initializer, initargs)\r
-    \r
-    def __getitem__(self, i):\r
-        return self._slotlist[i]\r
-\r
-    def __len__(self):\r
-        return len(self._slotlist)\r
-\r
-    def __iter__(self):\r
-        return iter(self._slotlist)\r
-\r
-#\r
-# Queue subclass used by distributed pool\r
-#\r
-\r
-class SettableQueue(Queue.Queue):\r
-    def empty(self):\r
-        return not self.queue\r
-    def full(self):\r
-        return self.maxsize > 0 and len(self.queue) == self.maxsize\r
-    def set_contents(self, contents):\r
-        # length of contents must be at least as large as the number of\r
-        # threads which have potentially called get()\r
-        self.not_empty.acquire()\r
-        try:\r
-            self.queue.clear()\r
-            self.queue.extend(contents)\r
-            self.not_empty.notifyAll()\r
-        finally:\r
-            self.not_empty.release()\r
-            \r
-Cluster.register('_SettableQueue', SettableQueue)\r
-\r
-#\r
-# Class representing a notional cpu in the cluster\r
-#\r
-\r
-class Slot(object):\r
-    def __init__(self, host):\r
-        self.host = host\r
-        self.Process = host.Process\r
-\r
-#\r
-# Host\r
-#\r
-\r
-class Host(object):\r
-    '''\r
-    Represents a host to use as a node in a cluster.\r
-\r
-    `hostname` gives the name of the host.  If hostname is not\r
-    "localhost" then ssh is used to log in to the host.  To log in as\r
-    a different user use a host name of the form\r
-    "username@somewhere.org"\r
-\r
-    `slots` is used to specify the number of slots for processes on\r
-    the host.  This affects how often processes will be allocated to\r
-    this host.  Normally this should be equal to the number of cpus on\r
-    that host.\r
-    '''\r
-    def __init__(self, hostname, slots=None):\r
-        self.hostname = hostname\r
-        self.slots = slots\r
-        \r
-    def _start_manager(self, index, authkey, address, files):\r
-        if self.hostname != 'localhost':\r
-            tempdir = copy_to_remote_temporary_directory(self.hostname, files)\r
-            debug('startup files copied to %s:%s', self.hostname, tempdir)\r
-            p = subprocess.Popen(\r
-                ['ssh', self.hostname, 'python', '-c',\r
-                 '"import os; os.chdir(%r); '\r
-                 'from distributing import main; main()"' % tempdir],\r
-                stdin=subprocess.PIPE\r
-                )\r
-            data = dict(\r
-                name='BoostrappingHost', index=index,\r
-                dist_log_level=_logger.getEffectiveLevel(),\r
-                dir=tempdir, authkey=str(authkey), parent_address=address\r
-                )\r
-            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)\r
-            p.stdin.close()\r
-\r
-#\r
-# Copy files to remote directory, returning name of directory\r
-#\r
-\r
-unzip_code = '''"\r
-import tempfile, os, sys, tarfile\r
-tempdir = tempfile.mkdtemp(prefix='distrib-')\r
-os.chdir(tempdir)\r
-tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')\r
-for ti in tf:\r
-    tf.extract(ti)\r
-print tempdir\r
-"'''\r
-\r
-def copy_to_remote_temporary_directory(host, files):\r
-    p = subprocess.Popen(\r
-        ['ssh', host, 'python', '-c', unzip_code],\r
-        stdout=subprocess.PIPE, stdin=subprocess.PIPE\r
-        )\r
-    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')\r
-    for name in files:\r
-        tf.add(name, os.path.basename(name))\r
-    tf.close()\r
-    p.stdin.close()\r
-    return p.stdout.read().rstrip()\r
-\r
-#\r
-# Code which runs a host manager\r
-#\r
-\r
-def main():   \r
-    # get data from parent over stdin\r
-    data = pickle.load(sys.stdin)\r
-    sys.stdin.close()\r
-\r
-    # set some stuff\r
-    _logger.setLevel(data['dist_log_level'])\r
-    forking.prepare(data)\r
-    \r
-    # create server for a `HostManager` object\r
-    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])\r
-    current_process()._server = server\r
-    \r
-    # report server address and number of cpus back to parent\r
-    conn = connection.Client(data['parent_address'], authkey=data['authkey'])\r
-    conn.send((data['index'], server.address, slot_count))\r
-    conn.close()\r
-    \r
-    # set name etc\r
-    current_process().set_name('Host-%s:%s' % server.address)\r
-    util._run_after_forkers()\r
-    \r
-    # register a cleanup function\r
-    def cleanup(directory):\r
-        debug('removing directory %s', directory)\r
-        shutil.rmtree(directory)\r
-        debug('shutting down host manager')\r
-    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)\r
-    \r
-    # start host manager\r
-    debug('remote host manager starting in %s', data['dir'])\r
-    server.serve_forever()\r
+#
+# Module to allow spawning of processes on foreign host
+#
+# Depends on `multiprocessing` package -- tested with `processing-0.60`
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+
+__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
+
+#
+# Imports
+#
+
+import sys
+import os
+import tarfile
+import shutil
+import subprocess
+import logging
+import itertools
+import Queue
+
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+from multiprocessing import Process, current_process, cpu_count
+from multiprocessing import util, managers, connection, forking, pool
+
+#
+# Logging
+#
+
+def get_logger():
+    return _logger
+
+_logger = logging.getLogger('distributing')
+_logger.propogate = 0
+
+_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
+_handler = logging.StreamHandler()
+_handler.setFormatter(_formatter)
+_logger.addHandler(_handler)
+
+info = _logger.info
+debug = _logger.debug
+
+#
+# Get number of cpus
+#
+
+try:
+    slot_count = cpu_count()
+except NotImplemented:
+    slot_count = 1
+
+#
+# Manager type which spawns subprocesses
+#
+
+class HostManager(managers.SyncManager):
+    '''
+    Manager type used for spawning processes on a (presumably) foreign host
+    '''
+    def __init__(self, address, authkey):
+        managers.SyncManager.__init__(self, address, authkey)
+        self._name = 'Host-unknown'
+
+    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+        if hasattr(sys.modules['__main__'], '__file__'):
+            main_path = os.path.basename(sys.modules['__main__'].__file__)
+        else:
+            main_path = None
+        data = pickle.dumps((target, args, kwargs))
+        p = self._RemoteProcess(data, main_path)
+        if name is None:
+            temp = self._name.split('Host-')[-1] + '/Process-%s'
+            name = temp % ':'.join(map(str, p.get_identity()))
+        p.set_name(name)
+        return p
+
+    @classmethod
+    def from_address(cls, address, authkey):
+        manager = cls(address, authkey)
+        managers.transact(address, authkey, 'dummy')
+        manager._state.value = managers.State.STARTED
+        manager._name = 'Host-%s:%s' % manager.address
+        manager.shutdown = util.Finalize(
+            manager, HostManager._finalize_host,
+            args=(manager._address, manager._authkey, manager._name),
+            exitpriority=-10
+            )
+        return manager
+
+    @staticmethod
+    def _finalize_host(address, authkey, name):
+        managers.transact(address, authkey, 'shutdown')
+
+    def __repr__(self):
+        return '<Host(%s)>' % self._name
+
+#
+# Process subclass representing a process on (possibly) a remote machine
+#
+
+class RemoteProcess(Process):
+    '''
+    Represents a process started on a remote host
+    '''
+    def __init__(self, data, main_path):
+        assert not main_path or os.path.basename(main_path) == main_path
+        Process.__init__(self)
+        self._data = data
+        self._main_path = main_path
+
+    def _bootstrap(self):
+        forking.prepare({'main_path': self._main_path})
+        self._target, self._args, self._kwargs = pickle.loads(self._data)
+        return Process._bootstrap(self)
+
+    def get_identity(self):
+        return self._identity
+
+HostManager.register('_RemoteProcess', RemoteProcess)
+
+#
+# A Pool class that uses a cluster
+#
+
+class DistributedPool(pool.Pool):
+
+    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
+        self._cluster = cluster
+        self.Process = cluster.Process
+        pool.Pool.__init__(self, processes or len(cluster),
+                           initializer, initargs)
+
+    def _setup_queues(self):
+        self._inqueue = self._cluster._SettableQueue()
+        self._outqueue = self._cluster._SettableQueue()
+        self._quick_put = self._inqueue.put
+        self._quick_get = self._outqueue.get
+
+    @staticmethod
+    def _help_stuff_finish(inqueue, task_handler, size):
+        inqueue.set_contents([None] * size)
+
+#
+# Manager type which starts host managers on other machines
+#
+
+def LocalProcess(**kwds):
+    p = Process(**kwds)
+    p.set_name('localhost/' + p.name)
+    return p
+
+class Cluster(managers.SyncManager):
+    '''
+    Represents collection of slots running on various hosts.
+
+    `Cluster` is a subclass of `SyncManager` so it allows creation of
+    various types of shared objects.
+    '''
+    def __init__(self, hostlist, modules):
+        managers.SyncManager.__init__(self, address=('localhost', 0))
+        self._hostlist = hostlist
+        self._modules = modules
+        if __name__ not in modules:
+            modules.append(__name__)
+        files = [sys.modules[name].__file__ for name in modules]
+        for i, file in enumerate(files):
+            if file.endswith('.pyc') or file.endswith('.pyo'):
+                files[i] = file[:-4] + '.py'
+        self._files = [os.path.abspath(file) for file in files]
+
+    def start(self):
+        managers.SyncManager.start(self)
+
+        l = connection.Listener(family='AF_INET', authkey=self._authkey)
+
+        for i, host in enumerate(self._hostlist):
+            host._start_manager(i, self._authkey, l.address, self._files)
+
+        for host in self._hostlist:
+            if host.hostname != 'localhost':
+                conn = l.accept()
+                i, address, cpus = conn.recv()
+                conn.close()
+                other_host = self._hostlist[i]
+                other_host.manager = HostManager.from_address(address,
+                                                              self._authkey)
+                other_host.slots = other_host.slots or cpus
+                other_host.Process = other_host.manager.Process
+            else:
+                host.slots = host.slots or slot_count
+                host.Process = LocalProcess
+
+        self._slotlist = [
+            Slot(host) for host in self._hostlist for i in range(host.slots)
+            ]
+        self._slot_iterator = itertools.cycle(self._slotlist)
+        self._base_shutdown = self.shutdown
+        del self.shutdown
+
+    def shutdown(self):
+        for host in self._hostlist:
+            if host.hostname != 'localhost':
+                host.manager.shutdown()
+        self._base_shutdown()
+
+    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+        slot = self._slot_iterator.next()
+        return slot.Process(
+            group=group, target=target, name=name, args=args, kwargs=kwargs
+            )
+
+    def Pool(self, processes=None, initializer=None, initargs=()):
+        return DistributedPool(self, processes, initializer, initargs)
+
+    def __getitem__(self, i):
+        return self._slotlist[i]
+
+    def __len__(self):
+        return len(self._slotlist)
+
+    def __iter__(self):
+        return iter(self._slotlist)
+
+#
+# Queue subclass used by distributed pool
+#
+
+class SettableQueue(Queue.Queue):
+    def empty(self):
+        return not self.queue
+    def full(self):
+        return self.maxsize > 0 and len(self.queue) == self.maxsize
+    def set_contents(self, contents):
+        # length of contents must be at least as large as the number of
+        # threads which have potentially called get()
+        self.not_empty.acquire()
+        try:
+            self.queue.clear()
+            self.queue.extend(contents)
+            self.not_empty.notifyAll()
+        finally:
+            self.not_empty.release()
+
+Cluster.register('_SettableQueue', SettableQueue)
+
+#
+# Class representing a notional cpu in the cluster
+#
+
+class Slot(object):
+    def __init__(self, host):
+        self.host = host
+        self.Process = host.Process
+
+#
+# Host
+#
+
+class Host(object):
+    '''
+    Represents a host to use as a node in a cluster.
+
+    `hostname` gives the name of the host.  If hostname is not
+    "localhost" then ssh is used to log in to the host.  To log in as
+    a different user use a host name of the form
+    "username@somewhere.org"
+
+    `slots` is used to specify the number of slots for processes on
+    the host.  This affects how often processes will be allocated to
+    this host.  Normally this should be equal to the number of cpus on
+    that host.
+    '''
+    def __init__(self, hostname, slots=None):
+        self.hostname = hostname
+        self.slots = slots
+
+    def _start_manager(self, index, authkey, address, files):
+        if self.hostname != 'localhost':
+            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
+            debug('startup files copied to %s:%s', self.hostname, tempdir)
+            p = subprocess.Popen(
+                ['ssh', self.hostname, 'python', '-c',
+                 '"import os; os.chdir(%r); '
+                 'from distributing import main; main()"' % tempdir],
+                stdin=subprocess.PIPE
+                )
+            data = dict(
+                name='BoostrappingHost', index=index,
+                dist_log_level=_logger.getEffectiveLevel(),
+                dir=tempdir, authkey=str(authkey), parent_address=address
+                )
+            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
+            p.stdin.close()
+
+#
+# Copy files to remote directory, returning name of directory
+#
+
+unzip_code = '''"
+import tempfile, os, sys, tarfile
+tempdir = tempfile.mkdtemp(prefix='distrib-')
+os.chdir(tempdir)
+tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
+for ti in tf:
+    tf.extract(ti)
+print tempdir
+"'''
+
+def copy_to_remote_temporary_directory(host, files):
+    p = subprocess.Popen(
+        ['ssh', host, 'python', '-c', unzip_code],
+        stdout=subprocess.PIPE, stdin=subprocess.PIPE
+        )
+    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
+    for name in files:
+        tf.add(name, os.path.basename(name))
+    tf.close()
+    p.stdin.close()
+    return p.stdout.read().rstrip()
+
+#
+# Code which runs a host manager
+#
+
+def main():
+    # get data from parent over stdin
+    data = pickle.load(sys.stdin)
+    sys.stdin.close()
+
+    # set some stuff
+    _logger.setLevel(data['dist_log_level'])
+    forking.prepare(data)
+
+    # create server for a `HostManager` object
+    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
+    current_process()._server = server
+
+    # report server address and number of cpus back to parent
+    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
+    conn.send((data['index'], server.address, slot_count))
+    conn.close()
+
+    # set name etc
+    current_process().set_name('Host-%s:%s' % server.address)
+    util._run_after_forkers()
+
+    # register a cleanup function
+    def cleanup(directory):
+        debug('removing directory %s', directory)
+        shutil.rmtree(directory)
+        debug('shutting down host manager')
+    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
+
+    # start host manager
+    debug('remote host manager starting in %s', data['dir'])
+    server.serve_forever()