diff --git a/IPython/config/default/ipcluster_config.py b/IPython/config/default/ipcluster_config.py index 26dbd1b..5c2b431 100644 --- a/IPython/config/default/ipcluster_config.py +++ b/IPython/config/default/ipcluster_config.py @@ -23,8 +23,8 @@ c = get_config() # - PBSControllerLauncher # - SGEControllerLauncher # - WindowsHPCControllerLauncher -# c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher' -# c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' +# c.Global.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher' +# c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher' # Options are: # - LocalEngineSetLauncher @@ -32,7 +32,7 @@ c = get_config() # - PBSEngineSetLauncher # - SGEEngineSetLauncher # - WindowsHPCEngineSetLauncher -# c.Global.engine_launcher = 'IPython.parallel.launcher.LocalEngineSetLauncher' +# c.Global.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher' #----------------------------------------------------------------------------- # Global configuration diff --git a/IPython/config/default/ipcontroller_config.py b/IPython/config/default/ipcontroller_config.py index adf2878..492044c 100644 --- a/IPython/config/default/ipcontroller_config.py +++ b/IPython/config/default/ipcontroller_config.py @@ -89,7 +89,7 @@ c = get_config() # Which class to use for the db backend. Currently supported are DictDB (the # default), and MongoDB. Uncomment this line to enable MongoDB, which will # slow-down the Hub's responsiveness, but also reduce its memory footprint. -# c.HubFactory.db_class = 'IPython.parallel.mongodb.MongoDB' +# c.HubFactory.db_class = 'IPython.parallel.controller.mongodb.MongoDB' # The heartbeat ping frequency. This is the frequency (in ms) at which the # Hub pings engines for heartbeats. This determines how quickly the Hub @@ -144,11 +144,11 @@ c = get_config() # ----- in-memory configuration -------- # this line restores the default behavior: in-memory storage of all results. -# c.HubFactory.db_class = 'IPython.parallel.dictdb.DictDB' +# c.HubFactory.db_class = 'IPython.parallel.controller.dictdb.DictDB' # ----- sqlite configuration -------- # use this line to activate sqlite: -# c.HubFactory.db_class = 'IPython.parallel.sqlitedb.SQLiteDB' +# c.HubFactory.db_class = 'IPython.parallel.controller.sqlitedb.SQLiteDB' # You can specify the name of the db-file. By default, this will be located # in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db @@ -165,7 +165,7 @@ c = get_config() # ----- mongodb configuration -------- # use this line to activate mongodb: -# c.HubFactory.db_class = 'IPython.parallel.mongodb.MongoDB' +# c.HubFactory.db_class = 'IPython.parallel.controller.mongodb.MongoDB' # You can specify the args and kwargs pymongo will use when creating the Connection. # For more information on what these options might be, see pymongo documentation. diff --git a/IPython/external/ssh/tunnel.py b/IPython/external/ssh/tunnel.py index 4fda9e6..6a11a6b 100644 --- a/IPython/external/ssh/tunnel.py +++ b/IPython/external/ssh/tunnel.py @@ -34,7 +34,7 @@ try: except ImportError: pexpect = None -from IPython.parallel.entry_point import select_random_ports +from IPython.parallel.util import select_random_ports #----------------------------------------------------------------------------- # Code diff --git a/IPython/parallel/__init__.py b/IPython/parallel/__init__.py index c51b9d0..8865179 100644 --- a/IPython/parallel/__init__.py +++ b/IPython/parallel/__init__.py @@ -12,14 +12,15 @@ import zmq -if zmq.__version__ < '2.1.3': - raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.3, you appear to have %s"%zmq.__version__) +if zmq.__version__ < '2.1.4': + raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.4, you appear to have %s"%zmq.__version__) -from .asyncresult import * -from .client import Client -from .dependency import * -from .remotefunction import * -from .view import * from IPython.utils.pickleutil import Reference +from .client.asyncresult import * +from .client.client import Client +from .client.remotefunction import * +from .client.view import * +from .controller.dependency import * + diff --git a/IPython/parallel/apps/__init__.py b/IPython/parallel/apps/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/parallel/apps/__init__.py diff --git a/IPython/parallel/clusterdir.py b/IPython/parallel/apps/clusterdir.py similarity index 100% rename from IPython/parallel/clusterdir.py rename to IPython/parallel/apps/clusterdir.py diff --git a/IPython/parallel/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py similarity index 99% rename from IPython/parallel/ipclusterapp.py rename to IPython/parallel/apps/ipclusterapp.py index 7c94f6f..eea0714 100755 --- a/IPython/parallel/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -26,7 +26,7 @@ from zmq.eventloop import ioloop from IPython.external.argparse import ArgumentParser, SUPPRESS from IPython.utils.importstring import import_item -from IPython.parallel.clusterdir import ( +from .clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader, ClusterDirError, PIDFileError ) diff --git a/IPython/parallel/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py similarity index 98% rename from IPython/parallel/ipcontrollerapp.py rename to IPython/parallel/apps/ipcontrollerapp.py index bd40247..069f38e 100755 --- a/IPython/parallel/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -30,9 +30,9 @@ from zmq.log.handlers import PUBHandler from zmq.utils import jsonapi as json from IPython.config.loader import Config + from IPython.parallel import factory -from IPython.parallel.controller import ControllerFactory -from IPython.parallel.clusterdir import ( +from .clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader ) @@ -40,6 +40,7 @@ from IPython.parallel.util import disambiguate_ip_address, split_url # from IPython.kernel.fcutil import FCServiceFactory, FURLError from IPython.utils.traitlets import Instance, Unicode +from IPython.parallel.controller.controller import ControllerFactory #----------------------------------------------------------------------------- @@ -117,11 +118,11 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): ## Hub Config: paa('--mongodb', dest='HubFactory.db_class', action='store_const', - const='IPython.parallel.mongodb.MongoDB', + const='IPython.parallel.controller.mongodb.MongoDB', help='Use MongoDB for task storage [default: in-memory]') paa('--sqlite', dest='HubFactory.db_class', action='store_const', - const='IPython.parallel.sqlitedb.SQLiteDB', + const='IPython.parallel.controller.sqlitedb.SQLiteDB', help='Use SQLite3 for DB task storage [default: in-memory]') paa('--hb', type=int, dest='HubFactory.hb', nargs=2, diff --git a/IPython/parallel/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py similarity index 97% rename from IPython/parallel/ipengineapp.py rename to IPython/parallel/apps/ipengineapp.py index 3aa1ded..958768d 100755 --- a/IPython/parallel/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -22,16 +22,17 @@ import sys import zmq from zmq.eventloop import ioloop -from IPython.parallel.clusterdir import ( +from .clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader ) from IPython.zmq.log import EnginePUBHandler from IPython.parallel import factory -from IPython.parallel.engine import EngineFactory -from IPython.parallel.streamkernel import Kernel +from IPython.parallel.engine.engine import EngineFactory +from IPython.parallel.engine.streamkernel import Kernel from IPython.parallel.util import disambiguate_url + from IPython.utils.importstring import import_item diff --git a/IPython/parallel/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py similarity index 99% rename from IPython/parallel/iploggerapp.py rename to IPython/parallel/apps/iploggerapp.py index 4661ff3..1023867 100755 --- a/IPython/parallel/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -20,7 +20,7 @@ import sys import zmq -from IPython.parallel.clusterdir import ( +from .clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader ) diff --git a/IPython/parallel/launcher.py b/IPython/parallel/apps/launcher.py similarity index 97% rename from IPython/parallel/launcher.py rename to IPython/parallel/apps/launcher.py index 73cff17..36b8345 100644 --- a/IPython/parallel/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -46,7 +46,7 @@ from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError -from .factory import LoggingFactory +from IPython.parallel.factory import LoggingFactory # load winhpcjob only on Windows try: @@ -64,15 +64,15 @@ except ImportError: ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.ipclusterapp' + 'IPython.parallel.apps.ipclusterapp' )) ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.ipengineapp' + 'IPython.parallel.apps.ipengineapp' )) ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.ipcontrollerapp' + 'IPython.parallel.apps.ipcontrollerapp' )) #----------------------------------------------------------------------------- diff --git a/IPython/parallel/logwatcher.py b/IPython/parallel/apps/logwatcher.py similarity index 98% rename from IPython/parallel/logwatcher.py rename to IPython/parallel/apps/logwatcher.py index 51735f4..adf4355 100644 --- a/IPython/parallel/logwatcher.py +++ b/IPython/parallel/apps/logwatcher.py @@ -21,7 +21,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.utils.traitlets import Int, Str, Instance, List -from .factory import LoggingFactory +from IPython.parallel.factory import LoggingFactory #----------------------------------------------------------------------------- # Classes diff --git a/IPython/parallel/winhpcjob.py b/IPython/parallel/apps/winhpcjob.py similarity index 100% rename from IPython/parallel/winhpcjob.py rename to IPython/parallel/apps/winhpcjob.py diff --git a/IPython/parallel/client/__init__.py b/IPython/parallel/client/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/parallel/client/__init__.py diff --git a/IPython/parallel/asyncresult.py b/IPython/parallel/client/asyncresult.py similarity index 91% rename from IPython/parallel/asyncresult.py rename to IPython/parallel/client/asyncresult.py index c2f283e..ab87c55 100644 --- a/IPython/parallel/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -15,7 +15,7 @@ import time from zmq import MessageTracker from IPython.external.decorator import decorator -from . import error +from IPython.parallel import error #----------------------------------------------------------------------------- # Classes diff --git a/IPython/parallel/client.py b/IPython/parallel/client/client.py similarity index 99% rename from IPython/parallel/client.py rename to IPython/parallel/client/client.py index 714e744..e4ff56d 100644 --- a/IPython/parallel/client.py +++ b/IPython/parallel/client/client.py @@ -29,11 +29,12 @@ from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode, from IPython.external.decorator import decorator from IPython.external.ssh import tunnel -from . import error -from . import util -from . import streamsession as ss +from IPython.parallel import error +from IPython.parallel import streamsession as ss +from IPython.parallel import util + from .asyncresult import AsyncResult, AsyncHubResult -from .clusterdir import ClusterDir, ClusterDirError +from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError from .view import DirectView, LoadBalancedView #-------------------------------------------------------------------------- diff --git a/IPython/parallel/map.py b/IPython/parallel/client/map.py similarity index 100% rename from IPython/parallel/map.py rename to IPython/parallel/client/map.py diff --git a/IPython/parallel/remotefunction.py b/IPython/parallel/client/remotefunction.py similarity index 100% rename from IPython/parallel/remotefunction.py rename to IPython/parallel/client/remotefunction.py diff --git a/IPython/parallel/view.py b/IPython/parallel/client/view.py similarity index 92% rename from IPython/parallel/view.py rename to IPython/parallel/client/view.py index 0dfb2b7..61d3fef 100644 --- a/IPython/parallel/view.py +++ b/IPython/parallel/client/view.py @@ -23,10 +23,11 @@ from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, from IPython.external.decorator import decorator +from IPython.parallel import util +from IPython.parallel.controller.dependency import Dependency, dependent + from . import map as Map -from . import util from .asyncresult import AsyncResult, AsyncMapResult -from .dependency import Dependency, dependent from .remotefunction import ParallelFunction, parallel, remote #----------------------------------------------------------------------------- @@ -68,6 +69,7 @@ def spin_after(f, self, *args, **kwargs): # Classes #----------------------------------------------------------------------------- +@testdec.skip_doctest class View(HasTraits): """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. @@ -105,7 +107,7 @@ class View(HasTraits): history=List() outstanding = Set() results = Dict() - client = Instance('IPython.parallel.client.Client') + client = Instance('IPython.parallel.Client') _socket = Instance('zmq.Socket') _flag_names = List(['targets', 'block', 'track']) @@ -386,11 +388,6 @@ class DirectView(View): """sync_imports(local=True) as a property. See sync_imports for details. - - In [10]: with v.importer: - ....: import numpy - ....: - importing numpy on engine(s) """ return self.sync_imports(True) diff --git a/IPython/parallel/controller/__init__.py b/IPython/parallel/controller/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/parallel/controller/__init__.py diff --git a/IPython/parallel/controller.py b/IPython/parallel/controller/controller.py similarity index 98% rename from IPython/parallel/controller.py rename to IPython/parallel/controller/controller.py index 3ba76d3..414d45d 100755 --- a/IPython/parallel/controller.py +++ b/IPython/parallel/controller/controller.py @@ -14,7 +14,6 @@ This is a collection of one Hub and several Schedulers. #----------------------------------------------------------------------------- from __future__ import print_function -import logging from multiprocessing import Process import zmq @@ -23,7 +22,7 @@ from zmq.devices import ProcessMonitoredQueue from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, CStr, Instance, List, Bool -from .entry_point import signal_children +from IPython.parallel.util import signal_children from .hub import Hub, HubFactory from .scheduler import launch_scheduler diff --git a/IPython/parallel/dependency.py b/IPython/parallel/controller/dependency.py similarity index 97% rename from IPython/parallel/dependency.py rename to IPython/parallel/controller/dependency.py index b18ff1a..5727b24 100644 --- a/IPython/parallel/dependency.py +++ b/IPython/parallel/controller/dependency.py @@ -8,9 +8,9 @@ from types import ModuleType -from .asyncresult import AsyncResult -from .error import UnmetDependency -from .util import interactive +from IPython.parallel.client.asyncresult import AsyncResult +from IPython.parallel.error import UnmetDependency +from IPython.parallel.util import interactive class depend(object): """Dependency decorator, for use with tasks. diff --git a/IPython/parallel/dictdb.py b/IPython/parallel/controller/dictdb.py similarity index 100% rename from IPython/parallel/dictdb.py rename to IPython/parallel/controller/dictdb.py diff --git a/IPython/parallel/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py similarity index 91% rename from IPython/parallel/heartmonitor.py rename to IPython/parallel/controller/heartmonitor.py index 65bcce1..31429ca 100644 --- a/IPython/parallel/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -12,15 +12,14 @@ and hearts are tracked based on their XREQ identities. from __future__ import print_function import time -import logging import uuid import zmq -from zmq.devices import ProcessDevice,ThreadDevice +from zmq.devices import ProcessDevice, ThreadDevice from zmq.eventloop import ioloop, zmqstream from IPython.utils.traitlets import Set, Instance, CFloat, Bool -from .factory import LoggingFactory +from IPython.parallel.factory import LoggingFactory class Heart(object): """A basic heart object for responding to a HeartMonitor. diff --git a/IPython/parallel/hub.py b/IPython/parallel/controller/hub.py similarity index 97% rename from IPython/parallel/hub.py rename to IPython/parallel/controller/hub.py index 7e66a27..e32708a 100755 --- a/IPython/parallel/hub.py +++ b/IPython/parallel/controller/hub.py @@ -27,12 +27,11 @@ from zmq.eventloop.zmqstream import ZMQStream from IPython.utils.importstring import import_item from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool -from .entry_point import select_random_ports -from .factory import RegistrationFactory, LoggingFactory +from IPython.parallel import error +from IPython.parallel.factory import RegistrationFactory, LoggingFactory +from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601 -from . import error from .heartmonitor import HeartMonitor -from .util import validate_url_container, ISO8601 #----------------------------------------------------------------------------- # Code @@ -160,11 +159,11 @@ class HubFactory(RegistrationFactory): monitor_url = CStr('') - db_class = CStr('IPython.parallel.dictdb.DictDB', config=True) + db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True) # not configurable - db = Instance('IPython.parallel.dictdb.BaseDB') - heartmonitor = Instance('IPython.parallel.heartmonitor.HeartMonitor') + db = Instance('IPython.parallel.controller.dictdb.BaseDB') + heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor') subconstructors = List() _constructed = Bool(False) diff --git a/IPython/parallel/mongodb.py b/IPython/parallel/controller/mongodb.py similarity index 100% rename from IPython/parallel/mongodb.py rename to IPython/parallel/controller/mongodb.py diff --git a/IPython/parallel/scheduler.py b/IPython/parallel/controller/scheduler.py similarity index 97% rename from IPython/parallel/scheduler.py rename to IPython/parallel/controller/scheduler.py index 6f61794..6e3a3ba 100644 --- a/IPython/parallel/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -36,11 +36,11 @@ from zmq.eventloop import ioloop, zmqstream from IPython.external.decorator import decorator from IPython.utils.traitlets import Instance, Dict, List, Set -from . import error -from .dependency import Dependency -from .entry_point import connect_logger, local_logger -from .factory import SessionFactory +from IPython.parallel import error +from IPython.parallel.factory import SessionFactory +from IPython.parallel.util import connect_logger, local_logger +from .dependency import Dependency @decorator def logged(f,self,*args,**kwargs): diff --git a/IPython/parallel/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py similarity index 99% rename from IPython/parallel/sqlitedb.py rename to IPython/parallel/controller/sqlitedb.py index f7ab22b..8a6bd31 100644 --- a/IPython/parallel/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -17,7 +17,7 @@ from zmq.eventloop import ioloop from IPython.utils.traitlets import CUnicode, CStr, Instance, List from .dictdb import BaseDB -from .util import ISO8601 +from IPython.parallel.util import ISO8601 #----------------------------------------------------------------------------- # SQLite operators, adapters, and converters diff --git a/IPython/parallel/engine/__init__.py b/IPython/parallel/engine/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/parallel/engine/__init__.py diff --git a/IPython/parallel/engine.py b/IPython/parallel/engine/engine.py similarity index 96% rename from IPython/parallel/engine.py rename to IPython/parallel/engine/engine.py index 6f05292..9bbf7a0 100755 --- a/IPython/parallel/engine.py +++ b/IPython/parallel/engine/engine.py @@ -22,11 +22,12 @@ from zmq.eventloop import ioloop, zmqstream from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat # from IPython.utils.localinterfaces import LOCALHOST -from . import heartmonitor -from .factory import RegistrationFactory +from IPython.parallel.controller.heartmonitor import Heart +from IPython.parallel.factory import RegistrationFactory +from IPython.parallel.streamsession import Message +from IPython.parallel.util import disambiguate_url + from .streamkernel import Kernel -from .streamsession import Message -from .util import disambiguate_url class EngineFactory(RegistrationFactory): """IPython engine""" @@ -129,7 +130,7 @@ class EngineFactory(RegistrationFactory): loop=loop, user_ns = self.user_ns, logname=self.log.name) self.kernel.start() hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ] - heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) + heart = Heart(*map(str, hb_addrs), heart_id=identity) # ioloop.DelayedCallback(heart.start, 1000, self.loop).start() heart.start() diff --git a/IPython/parallel/kernelstarter.py b/IPython/parallel/engine/kernelstarter.py similarity index 99% rename from IPython/parallel/kernelstarter.py rename to IPython/parallel/engine/kernelstarter.py index 416df17..3395e4f 100644 --- a/IPython/parallel/kernelstarter.py +++ b/IPython/parallel/engine/kernelstarter.py @@ -8,7 +8,7 @@ from zmq.eventloop import ioloop -from .streamsession import StreamSession +from IPython.parallel.streamsession import StreamSession class KernelStarter(object): """Object for resetting/killing the Kernel.""" diff --git a/IPython/parallel/streamkernel.py b/IPython/parallel/engine/streamkernel.py similarity index 87% rename from IPython/parallel/streamkernel.py rename to IPython/parallel/engine/streamkernel.py index 9f7a5c9..cabf1d4 100755 --- a/IPython/parallel/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -22,25 +22,18 @@ import time from code import CommandCompiler from datetime import datetime from pprint import pprint -from signal import SIGTERM, SIGKILL # System library imports. import zmq from zmq.eventloop import ioloop, zmqstream # Local imports. -from IPython.core import ultratb from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str from IPython.zmq.completer import KernelCompleter -from IPython.zmq.iostream import OutStream -from IPython.zmq.displayhook import DisplayHook -from . import heartmonitor -from .client import Client -from .error import wrap_exception -from .factory import SessionFactory -from .streamsession import StreamSession -from .util import serialize_object, unpack_apply_message, ISO8601, Namespace +from IPython.parallel.error import wrap_exception +from IPython.parallel.factory import SessionFactory +from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601 def printer(*args): pprint(args, stream=sys.__stdout__) @@ -71,7 +64,7 @@ class Kernel(SessionFactory): control_stream = Instance(zmqstream.ZMQStream) task_stream = Instance(zmqstream.ZMQStream) iopub_stream = Instance(zmqstream.ZMQStream) - client = Instance('IPython.parallel.client.Client') + client = Instance('IPython.parallel.Client') # internals shell_streams = List() @@ -428,62 +421,3 @@ class Kernel(SessionFactory): # # don't busywait # time.sleep(1e-3) -def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, - client_addr=None, loop=None, context=None, key=None, - out_stream_factory=OutStream, display_hook_factory=DisplayHook): - """NO LONGER IN USE""" - # create loop, context, and session: - if loop is None: - loop = ioloop.IOLoop.instance() - if context is None: - context = zmq.Context() - c = context - session = StreamSession(key=key) - # print (session.key) - # print (control_addr, shell_addrs, iopub_addr, hb_addrs) - - # create Control Stream - control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) - control_stream.setsockopt(zmq.IDENTITY, identity) - control_stream.connect(control_addr) - - # create Shell Streams (MUX, Task, etc.): - shell_streams = [] - for addr in shell_addrs: - stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) - stream.setsockopt(zmq.IDENTITY, identity) - stream.connect(addr) - shell_streams.append(stream) - - # create iopub stream: - iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) - iopub_stream.setsockopt(zmq.IDENTITY, identity) - iopub_stream.connect(iopub_addr) - - # Redirect input streams and set a display hook. - if out_stream_factory: - sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') - sys.stdout.topic = 'engine.%i.stdout'%int_id - sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') - sys.stderr.topic = 'engine.%i.stderr'%int_id - if display_hook_factory: - sys.displayhook = display_hook_factory(session, iopub_stream) - sys.displayhook.topic = 'engine.%i.pyout'%int_id - - - # launch heartbeat - heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) - heart.start() - - # create (optional) Client - if client_addr: - client = Client(client_addr, username=identity) - else: - client = None - - kernel = Kernel(id=int_id, session=session, control_stream=control_stream, - shell_streams=shell_streams, iopub_stream=iopub_stream, - client=client, loop=loop) - kernel.start() - return loop, c, kernel - diff --git a/IPython/parallel/entry_point.py b/IPython/parallel/entry_point.py deleted file mode 100644 index 193b807..0000000 --- a/IPython/parallel/entry_point.py +++ /dev/null @@ -1,125 +0,0 @@ -""" Defines helper functions for creating kernel entry points and process -launchers. - -************ -NOTE: Most of this module has been deprecated by moving to Configurables -************ -""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -# Standard library imports. -import atexit -import logging -import os -import stat -import socket -import sys -from signal import signal, SIGINT, SIGABRT, SIGTERM -from subprocess import Popen, PIPE -try: - from signal import SIGKILL -except ImportError: - SIGKILL=None - -# System library imports. -import zmq -from zmq.log import handlers - -# Local imports. -from IPython.core.ultratb import FormattedTB -from IPython.external.argparse import ArgumentParser -from IPython.zmq.log import EnginePUBHandler - -_random_ports = set() - -def select_random_ports(n): - """Selects and return n random ports that are available.""" - ports = [] - for i in xrange(n): - sock = socket.socket() - sock.bind(('', 0)) - while sock.getsockname()[1] in _random_ports: - sock.close() - sock = socket.socket() - sock.bind(('', 0)) - ports.append(sock) - for i, sock in enumerate(ports): - port = sock.getsockname()[1] - sock.close() - ports[i] = port - _random_ports.add(port) - return ports - -def signal_children(children): - """Relay interupt/term signals to children, for more solid process cleanup.""" - def terminate_children(sig, frame): - logging.critical("Got signal %i, terminating children..."%sig) - for child in children: - child.terminate() - - sys.exit(sig != SIGINT) - # sys.exit(sig) - for sig in (SIGINT, SIGABRT, SIGTERM): - signal(sig, terminate_children) - -def generate_exec_key(keyfile): - import uuid - newkey = str(uuid.uuid4()) - with open(keyfile, 'w') as f: - # f.write('ipython-key ') - f.write(newkey+'\n') - # set user-only RW permissions (0600) - # this will have no effect on Windows - os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - - -def integer_loglevel(loglevel): - try: - loglevel = int(loglevel) - except ValueError: - if isinstance(loglevel, str): - loglevel = getattr(logging, loglevel) - return loglevel - -def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG): - logger = logging.getLogger(logname) - if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): - # don't add a second PUBHandler - return - loglevel = integer_loglevel(loglevel) - lsock = context.socket(zmq.PUB) - lsock.connect(iface) - handler = handlers.PUBHandler(lsock) - handler.setLevel(loglevel) - handler.root_topic = root - logger.addHandler(handler) - logger.setLevel(loglevel) - -def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): - logger = logging.getLogger() - if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): - # don't add a second PUBHandler - return - loglevel = integer_loglevel(loglevel) - lsock = context.socket(zmq.PUB) - lsock.connect(iface) - handler = EnginePUBHandler(engine, lsock) - handler.setLevel(loglevel) - logger.addHandler(handler) - logger.setLevel(loglevel) - -def local_logger(logname, loglevel=logging.DEBUG): - loglevel = integer_loglevel(loglevel) - logger = logging.getLogger(logname) - if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]): - # don't add a second StreamHandler - return - handler = logging.StreamHandler() - handler.setLevel(loglevel) - logger.addHandler(handler) - logger.setLevel(loglevel) diff --git a/IPython/parallel/factory.py b/IPython/parallel/factory.py index 7c9da9a..4cd1ffc 100644 --- a/IPython/parallel/factory.py +++ b/IPython/parallel/factory.py @@ -23,7 +23,7 @@ from IPython.utils.importstring import import_item from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr import IPython.parallel.streamsession as ss -from IPython.parallel.entry_point import select_random_ports +from IPython.parallel.util import select_random_ports #----------------------------------------------------------------------------- # Classes diff --git a/IPython/parallel/ipcluster.py b/IPython/parallel/ipcluster.py deleted file mode 100644 index 1da18c8..0000000 --- a/IPython/parallel/ipcluster.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -"""Old ipcluster script. Possibly to be removed.""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- -from __future__ import print_function - -import os -import sys -import time -from subprocess import Popen, PIPE - -from IPython.external.argparse import ArgumentParser, SUPPRESS - -def _filter_arg(flag, args): - filtered = [] - if flag in args: - filtered.append(flag) - idx = args.index(flag) - if len(args) > idx+1: - if not args[idx+1].startswith('-'): - filtered.append(args[idx+1]) - return filtered - -def filter_args(flags, args=sys.argv[1:]): - filtered = [] - for flag in flags: - if isinstance(flag, (list,tuple)): - for f in flag: - filtered.extend(_filter_arg(f, args)) - else: - filtered.extend(_filter_arg(flag, args)) - return filtered - -def _strip_arg(flag, args): - while flag in args: - idx = args.index(flag) - args.pop(idx) - if len(args) > idx: - if not args[idx].startswith('-'): - args.pop(idx) - -def strip_args(flags, args=sys.argv[1:]): - args = list(args) - for flag in flags: - if isinstance(flag, (list,tuple)): - for f in flag: - _strip_arg(f, args) - else: - _strip_arg(flag, args) - return args - - -def launch_process(mod, args): - """Launch a controller or engine in a subprocess.""" - code = "from IPython.parallel.%s import launch_new_instance;launch_new_instance()"%mod - arguments = [ sys.executable, '-c', code ] + args - blackholew = file(os.devnull, 'w') - blackholer = file(os.devnull, 'r') - - proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE) - return proc - -def main(): - parser = ArgumentParser(argument_default=SUPPRESS) - parser.add_argument('--n', '-n', type=int, default=1, - help="The number of engines to start.") - ns,args = parser.parse_known_args() - n = ns.n - - controller = launch_process('ipcontrollerapp', args) - for i in range(10): - time.sleep(.1) - if controller.poll() is not None: - print("Controller failed to launch:") - print (controller.stderr.read()) - sys.exit(255) - - print("Launched Controller") - engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ] - print("%i Engines started"%n) - - def wait_quietly(p): - try: - p.wait() - except KeyboardInterrupt: - pass - - wait_quietly(controller) - map(wait_quietly, engines) - print ("Engines cleaned up.") - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/IPython/parallel/remotenamespace.py b/IPython/parallel/remotenamespace.py deleted file mode 100644 index 1c1cb4a..0000000 --- a/IPython/parallel/remotenamespace.py +++ /dev/null @@ -1,101 +0,0 @@ -"""RemoteNamespace object, for dict style interaction with a remote -execution kernel.""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -from functools import wraps -from IPython.external.decorator import decorator - -def _clear(): - globals().clear() - -@decorator -def spinfirst(f): - @wraps(f) - def spun_method(self, *args, **kwargs): - self.spin() - return f(self, *args, **kwargs) - return spun_method - -@decorator -def myblock(f, self, *args, **kwargs): - block = self.client.block - self.client.block = self.block - ret = f(self, *args, **kwargs) - self.client.block = block - return ret - -class RemoteNamespace(object): - """A RemoteNamespace object, providing dictionary - access to an engine via an IPython.zmq.client object. - - - """ - client = None - queue = None - id = None - block = False - - def __init__(self, client, id): - self.client = client - self.id = id - self.block = client.block # initial state is same as client - - def __repr__(self): - return ""%self.id - - @myblock - def apply(self, f, *args, **kwargs): - """call f(*args, **kwargs) in remote namespace - - This method has no access to the user namespace""" - return self.client.apply_to(self.id, f, *args, **kwargs) - - @myblock - def apply_bound(self, f, *args, **kwargs): - """call `f(*args, **kwargs)` in remote namespace. - - `f` will have access to the user namespace as globals().""" - return self.client.apply_bound_to(self.id, f, *args, **kwargs) - - @myblock - def update(self, ns): - """update remote namespace with dict `ns`""" - return self.client.push(self.id, ns, self.block) - - def get(self, key_s): - """get object(s) by `key_s` from remote namespace - will return one object if it is a key. - It also takes a list of keys, and will return a list of objects.""" - return self.client.pull(self.id, key_s, self.block) - - push = update - pull = get - - def __getitem__(self, key): - return self.get(key) - - def __setitem__(self,key,value): - self.update({key:value}) - - def clear(self): - """clear the remote namespace""" - return self.client.apply_bound_to(self.id, _clear) - - @decorator - def withme(self, toapply): - """for use as a decorator, this turns a function into - one that executes remotely.""" - @wraps(toapply) - def applied(self, *args, **kwargs): - return self.apply_bound(self, toapply, *args, **kwargs) - return applied - - - - - diff --git a/IPython/parallel/scheduler/__init__.py b/IPython/parallel/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/parallel/scheduler/__init__.py diff --git a/IPython/parallel/scripts/ipcluster b/IPython/parallel/scripts/ipcluster index 743a510..066a343 100755 --- a/IPython/parallel/scripts/ipcluster +++ b/IPython/parallel/scripts/ipcluster @@ -13,6 +13,6 @@ #----------------------------------------------------------------------------- -from IPython.parallel.ipclusterapp import launch_new_instance +from IPython.parallel.apps.ipclusterapp import launch_new_instance launch_new_instance() diff --git a/IPython/parallel/scripts/ipcontroller b/IPython/parallel/scripts/ipcontroller index 1556dff..1939d96 100755 --- a/IPython/parallel/scripts/ipcontroller +++ b/IPython/parallel/scripts/ipcontroller @@ -13,6 +13,6 @@ #----------------------------------------------------------------------------- -from IPython.parallel.ipcontrollerapp import launch_new_instance +from IPython.parallel.apps.ipcontrollerapp import launch_new_instance launch_new_instance() diff --git a/IPython/parallel/scripts/ipengine b/IPython/parallel/scripts/ipengine index 56f4649..9c178a4 100755 --- a/IPython/parallel/scripts/ipengine +++ b/IPython/parallel/scripts/ipengine @@ -13,7 +13,7 @@ #----------------------------------------------------------------------------- -from IPython.parallel.ipengineapp import launch_new_instance +from IPython.parallel.apps.ipengineapp import launch_new_instance launch_new_instance() diff --git a/IPython/parallel/scripts/iplogger b/IPython/parallel/scripts/iplogger index 97d1fa3..1ae833a 100755 --- a/IPython/parallel/scripts/iplogger +++ b/IPython/parallel/scripts/iplogger @@ -13,7 +13,7 @@ #----------------------------------------------------------------------------- -from IPython.parallel.iploggerapp import launch_new_instance +from IPython.parallel.apps.iploggerapp import launch_new_instance launch_new_instance() diff --git a/IPython/parallel/taskthread.py b/IPython/parallel/taskthread.py deleted file mode 100644 index 2a80701..0000000 --- a/IPython/parallel/taskthread.py +++ /dev/null @@ -1,106 +0,0 @@ -"""Thread for popping Tasks from zmq to Python Queue""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - - -import time -from threading import Thread - -try: - from queue import Queue -except: - from Queue import Queue - -import zmq -from zmq.core.poll import _poll as poll -from zmq.devices import ThreadDevice -from IPython.parallel import streamsession as ss - - -class QueueStream(object): - def __init__(self, in_queue, out_queue): - self.in_queue = in_queue - self.out_queue = out_queue - - def send_multipart(self, *args, **kwargs): - while self.out_queue.full(): - time.sleep(1e-3) - self.out_queue.put(('send_multipart', args, kwargs)) - - def send(self, *args, **kwargs): - while self.out_queue.full(): - time.sleep(1e-3) - self.out_queue.put(('send', args, kwargs)) - - def recv_multipart(self): - return self.in_queue.get() - - def empty(self): - return self.in_queue.empty() - -class TaskThread(ThreadDevice): - """Class for popping Tasks from C-ZMQ->Python Queue""" - max_qsize = 100 - in_socket = None - out_socket = None - # queue = None - - def __init__(self, queue_type, mon_type, engine_id, max_qsize=100): - ThreadDevice.__init__(self, 0, queue_type, mon_type) - self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id) - self.engine_id = engine_id - self.in_queue = Queue(max_qsize) - self.out_queue = Queue(max_qsize) - self.max_qsize = max_qsize - - @property - def queues(self): - return self.in_queue, self.out_queue - - @property - def can_recv(self): - # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3) - return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 ) - - @property - def can_send(self): - return not self.out_queue.empty() - - def run(self): - print 'running' - self.queue_socket,self.mon_socket = self._setup_sockets() - print 'setup' - - while True: - while not self.can_send and not self.can_recv: - # print 'idle' - # nothing to do, wait - time.sleep(1e-3) - while self.can_send: - # flush out queue - print 'flushing...' - meth, args, kwargs = self.out_queue.get() - getattr(self.queue_socket, meth)(*args, **kwargs) - print 'flushed' - - if self.can_recv: - print 'recving' - # get another job from zmq - msg = self.queue_socket.recv_multipart(0, copy=False) - # put it in the Queue - self.in_queue.put(msg) - idents,msg = self.session.feed_identities(msg, copy=False) - msg = self.session.unpack_message(msg, content=False, copy=False) - # notify the Controller that we got it - self.mon_socket.send('tracktask', zmq.SNDMORE) - header = msg['header'] - msg_id = header['msg_id'] - content = dict(engine_id=self.engine_id, msg_id = msg_id) - self.session.send(self.mon_socket, 'task_receipt', content=content) - print 'recvd' - - \ No newline at end of file diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 8299cf2..f3a28ae 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -15,7 +15,7 @@ import tempfile import time from subprocess import Popen, PIPE, STDOUT -from IPython.parallel import client +from IPython.parallel import Client processes = [] blackhole = tempfile.TemporaryFile() @@ -27,14 +27,14 @@ def setup(): processes.append(cp) time.sleep(.5) add_engines(1) - c = client.Client(profile='iptest') + c = Client(profile='iptest') while not c.ids: time.sleep(.1) c.spin() c.close() def add_engines(n=1, profile='iptest'): - rc = client.Client(profile=profile) + rc = Client(profile=profile) base = len(rc) eps = [] for i in range(n): diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index bbfb68e..6e48079 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -10,8 +10,6 @@ import sys import tempfile import time -from signal import SIGINT -from multiprocessing import Process from nose import SkipTest @@ -21,9 +19,7 @@ from zmq.tests import BaseZMQTestCase from IPython.external.decorator import decorator from IPython.parallel import error -from IPython.parallel.client import Client -from IPython.parallel.ipcluster import launch_process -from IPython.parallel.entry_point import select_random_ports +from IPython.parallel import Client from IPython.parallel.tests import processes,add_engines # simple tasks for use in apply tests diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index e3a213a..b2d1311 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -16,10 +16,10 @@ from tempfile import mktemp import zmq -from IPython.parallel import client as clientmod +from IPython.parallel.client import client as clientmod from IPython.parallel import error -from IPython.parallel.asyncresult import AsyncResult, AsyncHubResult -from IPython.parallel.view import LoadBalancedView, DirectView +from IPython.parallel import AsyncResult, AsyncHubResult +from IPython.parallel import LoadBalancedView, DirectView from clienttest import ClusterTestCase, segfault, wait, add_engines diff --git a/IPython/parallel/tests/test_dependency.py b/IPython/parallel/tests/test_dependency.py index 9773e93..9fe1604 100644 --- a/IPython/parallel/tests/test_dependency.py +++ b/IPython/parallel/tests/test_dependency.py @@ -18,7 +18,7 @@ import os from IPython.utils.pickleutil import can, uncan -from IPython.parallel import dependency as dmod +import IPython.parallel as pmod from IPython.parallel.util import interactive from IPython.parallel.tests import add_engines @@ -27,7 +27,7 @@ from .clienttest import ClusterTestCase def setup(): add_engines(1) -@dmod.require('time') +@pmod.require('time') def wait(n): time.sleep(n) return n @@ -65,7 +65,7 @@ class DependencyTest(ClusterTestCase): def test_require_imports(self): """test that @require imports names""" @self.cancan - @dmod.require('urllib') + @pmod.require('urllib') @interactive def encode(dikt): return urllib.urlencode(dikt) @@ -73,13 +73,13 @@ class DependencyTest(ClusterTestCase): self.assertEquals(encode(dict(a=5)), 'a=5') def test_success_only(self): - dep = dmod.Dependency(mixed, success=True, failure=False) + dep = pmod.Dependency(mixed, success=True, failure=False) self.assertUnmet(dep) self.assertUnreachable(dep) dep.all=False self.assertMet(dep) self.assertReachable(dep) - dep = dmod.Dependency(completed, success=True, failure=False) + dep = pmod.Dependency(completed, success=True, failure=False) self.assertMet(dep) self.assertReachable(dep) dep.all=False @@ -87,13 +87,13 @@ class DependencyTest(ClusterTestCase): self.assertReachable(dep) def test_failure_only(self): - dep = dmod.Dependency(mixed, success=False, failure=True) + dep = pmod.Dependency(mixed, success=False, failure=True) self.assertUnmet(dep) self.assertUnreachable(dep) dep.all=False self.assertMet(dep) self.assertReachable(dep) - dep = dmod.Dependency(completed, success=False, failure=True) + dep = pmod.Dependency(completed, success=False, failure=True) self.assertUnmet(dep) self.assertUnreachable(dep) dep.all=False diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index 50781ba..2714909 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -17,8 +17,8 @@ import zmq from IPython import parallel as pmod from IPython.parallel import error -from IPython.parallel.asyncresult import AsyncResult, AsyncHubResult, AsyncMapResult -from IPython.parallel.view import LoadBalancedView, DirectView +from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult +from IPython.parallel import LoadBalancedView, DirectView from IPython.parallel.util import interactive from IPython.parallel.tests import add_engines diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index 03f1251..dc00ed4 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -10,8 +10,18 @@ # Imports #----------------------------------------------------------------------------- +# Standard library imports. +import logging +import os import re +import stat import socket +import sys +from signal import signal, SIGINT, SIGABRT, SIGTERM +try: + from signal import SIGKILL +except ImportError: + SIGKILL=None try: import cPickle @@ -20,10 +30,16 @@ except: cPickle = None import pickle +# System library imports +import zmq +from zmq.log import handlers +# IPython imports from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence from IPython.utils.newserialized import serialize, unserialize +from IPython.zmq.log import EnginePUBHandler +# globals ISO8601="%Y-%m-%dT%H:%M:%S.%f" #----------------------------------------------------------------------------- @@ -352,3 +368,95 @@ def _execute(code): """helper method for implementing `client.execute` via `client.apply`""" exec code in globals() +#-------------------------------------------------------------------------- +# extra process management utilities +#-------------------------------------------------------------------------- + +_random_ports = set() + +def select_random_ports(n): + """Selects and return n random ports that are available.""" + ports = [] + for i in xrange(n): + sock = socket.socket() + sock.bind(('', 0)) + while sock.getsockname()[1] in _random_ports: + sock.close() + sock = socket.socket() + sock.bind(('', 0)) + ports.append(sock) + for i, sock in enumerate(ports): + port = sock.getsockname()[1] + sock.close() + ports[i] = port + _random_ports.add(port) + return ports + +def signal_children(children): + """Relay interupt/term signals to children, for more solid process cleanup.""" + def terminate_children(sig, frame): + logging.critical("Got signal %i, terminating children..."%sig) + for child in children: + child.terminate() + + sys.exit(sig != SIGINT) + # sys.exit(sig) + for sig in (SIGINT, SIGABRT, SIGTERM): + signal(sig, terminate_children) + +def generate_exec_key(keyfile): + import uuid + newkey = str(uuid.uuid4()) + with open(keyfile, 'w') as f: + # f.write('ipython-key ') + f.write(newkey+'\n') + # set user-only RW permissions (0600) + # this will have no effect on Windows + os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) + + +def integer_loglevel(loglevel): + try: + loglevel = int(loglevel) + except ValueError: + if isinstance(loglevel, str): + loglevel = getattr(logging, loglevel) + return loglevel + +def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG): + logger = logging.getLogger(logname) + if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): + # don't add a second PUBHandler + return + loglevel = integer_loglevel(loglevel) + lsock = context.socket(zmq.PUB) + lsock.connect(iface) + handler = handlers.PUBHandler(lsock) + handler.setLevel(loglevel) + handler.root_topic = root + logger.addHandler(handler) + logger.setLevel(loglevel) + +def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): + logger = logging.getLogger() + if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): + # don't add a second PUBHandler + return + loglevel = integer_loglevel(loglevel) + lsock = context.socket(zmq.PUB) + lsock.connect(iface) + handler = EnginePUBHandler(engine, lsock) + handler.setLevel(loglevel) + logger.addHandler(handler) + logger.setLevel(loglevel) + +def local_logger(logname, loglevel=logging.DEBUG): + loglevel = integer_loglevel(loglevel) + logger = logging.getLogger(logname) + if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]): + # don't add a second StreamHandler + return + handler = logging.StreamHandler() + handler.setLevel(loglevel) + logger.addHandler(handler) + logger.setLevel(loglevel) diff --git a/IPython/testing/iptest.py b/IPython/testing/iptest.py index 9d00571..9b70214 100644 --- a/IPython/testing/iptest.py +++ b/IPython/testing/iptest.py @@ -282,7 +282,10 @@ def make_runners(): # Packages to be tested via nose, that only depend on the stdlib nose_pkg_names = ['config', 'core', 'extensions', 'frontend', 'lib', 'scripts', 'testing', 'utils' ] - + + if have['zmq']: + nose_pkg_names.append('parallel') + # For debugging this code, only load quick stuff #nose_pkg_names = ['core', 'extensions'] # dbg diff --git a/IPython/utils/pickleutil.py b/IPython/utils/pickleutil.py index e09b73a..881496a 100644 --- a/IPython/utils/pickleutil.py +++ b/IPython/utils/pickleutil.py @@ -93,7 +93,7 @@ class CannedFunction(CannedObject): def can(obj): # import here to prevent module-level circular imports - from IPython.parallel.dependency import dependent + from IPython.parallel import dependent if isinstance(obj, dependent): keys = ('f','df') return CannedObject(obj, keys=keys) diff --git a/docs/source/parallel/parallel_details.txt b/docs/source/parallel/parallel_details.txt index bebf85d..ef8d369 100644 --- a/docs/source/parallel/parallel_details.txt +++ b/docs/source/parallel/parallel_details.txt @@ -241,7 +241,7 @@ Views ===== The principal extension of the :class:`~parallel.Client` is the -:class:`~parallel.view.View` class. The client +:class:`~parallel.View` class. The client DirectView diff --git a/docs/source/parallel/parallel_intro.txt b/docs/source/parallel/parallel_intro.txt index 093dac9..507b047 100644 --- a/docs/source/parallel/parallel_intro.txt +++ b/docs/source/parallel/parallel_intro.txt @@ -128,7 +128,7 @@ IPython client and views ------------------------ There is one primary object, the :class:`~.parallel.Client`, for connecting to a cluster. -For each execution model, there is a corresponding :class:`~.parallel.view.View`. These views +For each execution model, there is a corresponding :class:`~.parallel.View`. These views allow users to interact with a set of engines through the interface. Here are the two default views: diff --git a/docs/source/parallel/parallel_multiengine.txt b/docs/source/parallel/parallel_multiengine.txt index 9d879f5..b25bc9f 100644 --- a/docs/source/parallel/parallel_multiengine.txt +++ b/docs/source/parallel/parallel_multiengine.txt @@ -431,7 +431,7 @@ on the engines given by the :attr:`targets` attribute: Type %autopx to disable In [32]: max_evals = [] - + In [33]: for i in range(100): ....: a = numpy.random.rand(10,10) @@ -440,7 +440,7 @@ on the engines given by the :attr:`targets` attribute: ....: max_evals.append(evals[0].real) ....: ....: - + In [34]: %autopx Auto Parallel Disabled diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index 3375555..f02ebf8 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -140,7 +140,7 @@ There, instruct ipcluster to use the MPIExec launchers by adding the lines: .. sourcecode:: python - c.Global.engine_launcher = 'IPython.parallel.launcher.MPIExecEngineSetLauncher' + c.Global.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher' If the default MPI configuration is correct, then you can now start your cluster, with:: @@ -155,7 +155,7 @@ If you have a reason to also start the Controller with mpi, you can specify: .. sourcecode:: python - c.Global.controller_launcher = 'IPython.parallel.launcher.MPIExecControllerLauncher' + c.Global.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher' .. note:: @@ -196,8 +196,8 @@ and engines: .. sourcecode:: python - c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' - c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher' + c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher' + c.Global.engine_launcher = 'IPython.parallel.apps.launcher.PBSEngineSetLauncher' IPython does provide simple default batch templates for PBS and SGE, but you may need to specify your own. Here is a sample PBS script template: @@ -318,9 +318,9 @@ To use this mode, select the SSH launchers in :file:`ipcluster_config.py`: .. sourcecode:: python - c.Global.engine_launcher = 'IPython.parallel.launcher.SSHEngineSetLauncher' + c.Global.engine_launcher = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher' # and if the Controller is also to be remote: - c.Global.controller_launcher = 'IPython.parallel.launcher.SSHControllerLauncher' + c.Global.controller_launcher = 'IPython.parallel.apps.launcher.SSHControllerLauncher' The controller's remote location and configuration can be specified: diff --git a/docs/source/parallel/parallel_task.txt b/docs/source/parallel/parallel_task.txt index 326dc25..82d71f7 100644 --- a/docs/source/parallel/parallel_task.txt +++ b/docs/source/parallel/parallel_task.txt @@ -32,7 +32,7 @@ our :ref:`introduction ` to using IPython for parallel computing. Creating a ``Client`` instance ============================== -The first step is to import the IPython :mod:`IPython.parallel.client` +The first step is to import the IPython :mod:`IPython.parallel` module and then create a :class:`.Client` instance, and we will also be using a :class:`LoadBalancedView`, here called `lview`: @@ -145,7 +145,7 @@ There are two decorators and a class used for functional dependencies: .. sourcecode:: ipython - In [9]: from IPython.parallel.dependency import depend, require, dependent + In [9]: from IPython.parallel import depend, require, dependent @require ******** @@ -398,10 +398,10 @@ The :class:`LoadBalancedView` has many more powerful features that allow quite a of flexibility in how tasks are defined and run. The next places to look are in the following classes: -* :class:`IPython.parallel.view.LoadBalancedView` -* :class:`IPython.parallel.asyncresult.AsyncResult` -* :meth:`IPython.parallel.view.LoadBalancedView.apply` -* :mod:`IPython.parallel.dependency` +* :class:`~IPython.parallel.client.view.LoadBalancedView` +* :class:`~IPython.parallel.client.asyncresult.AsyncResult` +* :meth:`~IPython.parallel.client.view.LoadBalancedView.apply` +* :mod:`~IPython.parallel.controller.dependency` The following is an overview of how to use these classes together: diff --git a/docs/source/parallel/parallel_transition.txt b/docs/source/parallel/parallel_transition.txt index 84089a2..06e268e 100644 --- a/docs/source/parallel/parallel_transition.txt +++ b/docs/source/parallel/parallel_transition.txt @@ -1,8 +1,8 @@ .. _parallel_transition: -============================================================ -Transitioning from IPython.kernel to IPython.zmq.newparallel -============================================================ +===================================================== +Transitioning from IPython.kernel to IPython.parallel +===================================================== We have rewritten our parallel computing tools to use 0MQ_ and Tornado_. The redesign @@ -39,8 +39,8 @@ Creating a Client Creating a client with default settings has not changed much, though the extended options have. One significant change is that there are no longer multiple Client classes to represent the various execution models. There is just one low-level Client object for connecting to the -cluster, and View objects are created from that Client that provide the different interfaces -for execution. +cluster, and View objects are created from that Client that provide the different interfaces for +execution. To create a new client, and set up the default direct and load-balanced objects: @@ -124,8 +124,6 @@ argument. DirectView. - - The other major difference is the use of :meth:`apply`. When remote work is simply functions, the natural return value is the actual Python objects. It is no longer the recommended pattern to use stdout as your results, due to stream decoupling and the asynchronous nature of how the @@ -203,6 +201,25 @@ the engine beyond the duration of the task. LoadBalancedView. +PendingResults to AsyncResults +------------------------------ + +With the departure from Twisted, we no longer have the :class:`Deferred` class for representing +unfinished results. For this, we have an AsyncResult object, based on the object of the same +name in the built-in :mod:`multiprocessing.pool` module. Our version provides a superset of that +interface. + +However, unlike in IPython.kernel, we do not have PendingDeferred, PendingResult, or TaskResult +objects. Simply this one object, the AsyncResult. Every asynchronous (`block=False`) call +returns one. + +The basic methods of an AsyncResult are: + +.. sourcecode:: python + + AsyncResult.wait([timeout]): # wait for the result to arrive + AsyncResult.get([timeout]): # wait for the result to arrive, and then return it + AsyncResult.metadata: # dict of extra information about execution. There are still some things that behave the same as IPython.kernel: @@ -218,4 +235,11 @@ There are still some things that behave the same as IPython.kernel: In [6]: ar.r Out[6]: [5, 5] +The ``.r`` or ``.result`` property simply calls :meth:`get`, waiting for and returning the +result. + +.. seealso:: + + :ref:`AsyncResult details ` + diff --git a/docs/source/parallel/parallel_winhpc.txt b/docs/source/parallel/parallel_winhpc.txt index 0aac695..44ea429 100644 --- a/docs/source/parallel/parallel_winhpc.txt +++ b/docs/source/parallel/parallel_winhpc.txt @@ -233,9 +233,9 @@ will need to edit the following attributes in the file # Set these at the top of the file to tell ipcluster to use the # Windows HPC job scheduler. c.Global.controller_launcher = \ - 'IPython.parallel.launcher.WindowsHPCControllerLauncher' + 'IPython.parallel.apps.launcher.WindowsHPCControllerLauncher' c.Global.engine_launcher = \ - 'IPython.parallel.launcher.WindowsHPCEngineSetLauncher' + 'IPython.parallel.apps.launcher.WindowsHPCEngineSetLauncher' # Set these to the host name of the scheduler (head node) of your cluster. c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' diff --git a/setup.py b/setup.py index c928ab8..ace1149 100755 --- a/setup.py +++ b/setup.py @@ -215,19 +215,19 @@ if 'setuptools' in sys.modules: 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance', 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main', 'pycolor = IPython.utils.PyColorize:main', - 'ipcontroller = IPython.parallel.ipcontrollerapp:launch_new_instance', - 'ipengine = IPython.parallel.ipengineapp:launch_new_instance', - 'iplogger = IPython.parallel.iploggerapp:launch_new_instance', - 'ipcluster = IPython.parallel.ipclusterapp:launch_new_instance', + 'ipcontroller = IPython.parallel.apps.ipcontrollerapp:launch_new_instance', + 'ipengine = IPython.parallel.apps.ipengineapp:launch_new_instance', + 'iplogger = IPython.parallel.apps.iploggerapp:launch_new_instance', + 'ipcluster = IPython.parallel.apps.ipclusterapp:launch_new_instance', 'iptest = IPython.testing.iptest:main', 'irunner = IPython.lib.irunner:main' ] } setup_args['extras_require'] = dict( + parallel = 'pyzmq>=2.1.4', zmq = 'pyzmq>=2.0.10.1', doc='Sphinx>=0.3', test='nose>=0.10.1', - security='pyOpenSSL>=0.6' ) else: # If we are running without setuptools, call this function which will diff --git a/setupbase.py b/setupbase.py index 683a3a6..6ebdfc0 100644 --- a/setupbase.py +++ b/setupbase.py @@ -127,7 +127,8 @@ def find_packages(): add_package(packages, 'frontend.qt.console', tests=True) add_package(packages, 'frontend.terminal', tests=True) add_package(packages, 'lib', tests=True) - add_package(packages, 'parallel', tests=True) + add_package(packages, 'parallel', tests=True, scripts=True, + others=['apps','engine','client','controller']) add_package(packages, 'quarantine', tests=True) add_package(packages, 'scripts') add_package(packages, 'testing', tests=True)