From 2d79d3e488770559c952a66466e9204f9092b621 2011-04-08 00:38:16 From: MinRK Date: 2011-04-08 00:38:16 Subject: [PATCH] adapt kernel's ipcluster and Launchers to newparallel --- diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index f881deb..e2c5ed3 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -90,14 +90,6 @@ def defaultblock(f, self, *args, **kwargs): # Classes #-------------------------------------------------------------------------- -class ResultDict(dict): - """A subclass of dict that raises errors if it has them.""" - def __getitem__(self, key): - res = dict.__getitem__(self, key) - if isinstance(res, error.KernelError): - raise res - return res - class Metadata(dict): """Subclass of dict for initializing metadata values.""" def __init__(self, *args, **kwargs): diff --git a/IPython/zmq/parallel/clusterdir.py b/IPython/zmq/parallel/clusterdir.py index b89f84b..8afddbb 100755 --- a/IPython/zmq/parallel/clusterdir.py +++ b/IPython/zmq/parallel/clusterdir.py @@ -35,18 +35,6 @@ from IPython.utils.path import ( from IPython.utils.traitlets import Unicode #----------------------------------------------------------------------------- -# Warnings control -#----------------------------------------------------------------------------- -# Twisted generates annoying warnings with Python 2.6, as will do other code -# that imports 'sets' as of today -warnings.filterwarnings('ignore', 'the sets module is deprecated', - DeprecationWarning ) - -# This one also comes from Twisted -warnings.filterwarnings('ignore', 'the sha module is deprecated', - DeprecationWarning) - -#----------------------------------------------------------------------------- # Module errors #----------------------------------------------------------------------------- diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index c052ee9..1039112 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -1,7 +1,6 @@ #!/usr/bin/env python """The IPython Controller with 0MQ -This is the master object that handles connections from engines and clients, -and monitors traffic through the various queues. +This is a collection of one Hub and several Schedulers. """ #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team @@ -15,75 +14,25 @@ and monitors traffic through the various queues. #----------------------------------------------------------------------------- from __future__ import print_function -import os -import sys -import time import logging from multiprocessing import Process import zmq -from zmq.eventloop import ioloop -from zmq.eventloop.zmqstream import ZMQStream -# from zmq.devices import ProcessMonitoredQueue # internal: from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, Str, Instance, List, Bool -from IPython.zmq.entry_point import bind_port -from entry_point import (make_base_argument_parser, select_random_ports, split_ports, - connect_logger, parse_url, signal_children, generate_exec_key, - local_logger) +from entry_point import signal_children -import streamsession as session -import heartmonitor from scheduler import launch_scheduler from hub import Hub, HubFactory -from dictdb import DictDB -try: - import pymongo -except ImportError: - MongoDB=None -else: - from mongodb import MongoDB - -#------------------------------------------------------------------------- -# Entry Point -#------------------------------------------------------------------------- +#----------------------------------------------------------------------------- +# Configurable +#----------------------------------------------------------------------------- -def make_argument_parser(): - """Make an argument parser""" - parser = make_base_argument_parser() - - parser.add_argument('--client', type=int, metavar='PORT', default=0, - help='set the XREP port for clients [default: random]') - parser.add_argument('--notice', type=int, metavar='PORT', default=0, - help='set the PUB socket for registration notification [default: random]') - parser.add_argument('--hb', type=str, metavar='PORTS', - help='set the 2 ports for heartbeats [default: random]') - parser.add_argument('--ping', type=int, default=100, - help='set the heartbeat period in ms [default: 100]') - parser.add_argument('--monitor', type=int, metavar='PORT', default=0, - help='set the SUB port for queue monitoring [default: random]') - parser.add_argument('--mux', type=str, metavar='PORTS', - help='set the XREP ports for the MUX queue [default: random]') - parser.add_argument('--task', type=str, metavar='PORTS', - help='set the XREP/XREQ ports for the task queue [default: random]') - parser.add_argument('--control', type=str, metavar='PORTS', - help='set the XREP ports for the control queue [default: random]') - parser.add_argument('--iopub', type=str, metavar='PORTS', - help='set the PUB/SUB ports for the iopub relay [default: random]') - parser.add_argument('--scheduler', type=str, default='lru', - choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], - help='select the task scheduler [default: Python LRU]') - parser.add_argument('--mongodb', action='store_true', - help='Use MongoDB task storage [default: in-memory]') - parser.add_argument('--session', type=str, default=None, - help='Manually specify the session id.') - - return parser class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" @@ -158,188 +107,4 @@ class ControllerFactory(HubFactory): q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) q.daemon=True children.append(q) - - -def main(argv=None): - """DO NOT USE ME ANYMORE""" - - parser = make_argument_parser() - - args = parser.parse_args(argv) - parse_url(args) - - iface="%s://%s"%(args.transport,args.ip)+':%i' - - random_ports = 0 - if args.hb: - hb = split_ports(args.hb, 2) - else: - hb = select_random_ports(2) - if args.mux: - mux = split_ports(args.mux, 2) - else: - mux = None - random_ports += 2 - if args.iopub: - iopub = split_ports(args.iopub, 2) - else: - iopub = None - random_ports += 2 - if args.task: - task = split_ports(args.task, 2) - else: - task = None - random_ports += 2 - if args.control: - control = split_ports(args.control, 2) - else: - control = None - random_ports += 2 - - ctx = zmq.Context() - loop = ioloop.IOLoop.instance() - - - # Registrar socket - reg = ZMQStream(ctx.socket(zmq.XREP), loop) - regport = bind_port(reg, args.ip, args.regport) - - ### Engine connections ### - - # heartbeat - hpub = ctx.socket(zmq.PUB) - bind_port(hpub, args.ip, hb[0]) - hrep = ctx.socket(zmq.XREP) - bind_port(hrep, args.ip, hb[1]) - - hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping) - hmon.start() - - ### Client connections ### - # Clientele socket - c = ZMQStream(ctx.socket(zmq.XREP), loop) - cport = bind_port(c, args.ip, args.client) - # Notifier socket - n = ZMQStream(ctx.socket(zmq.PUB), loop) - nport = bind_port(n, args.ip, args.notice) - - ### Key File ### - if args.execkey and not os.path.isfile(args.execkey): - generate_exec_key(args.execkey) - - thesession = session.StreamSession(username=args.ident or "controller", - keyfile=args.execkey, session=args.session) - - ### build and launch the queues ### - - # monitor socket - sub = ctx.socket(zmq.SUB) - sub.setsockopt(zmq.SUBSCRIBE, "") - monport = bind_port(sub, args.ip, args.monitor) - sub = ZMQStream(sub, loop) - - ports = select_random_ports(random_ports) - children = [] - - # IOPub relay (in a Process) - if not iopub: - iopub = (ports.pop(),ports.pop()) - q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') - q.bind_in(iface%iopub[1]) - q.bind_out(iface%iopub[0]) - q.setsockopt_in(zmq.SUBSCRIBE, '') - q.connect_mon(iface%monport) - q.daemon=True - q.start() - children.append(q.launcher) - - # Multiplexer Queue (in a Process) - if not mux: - mux = (ports.pop(),ports.pop()) - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') - q.bind_in(iface%mux[0]) - q.bind_out(iface%mux[1]) - q.connect_mon(iface%monport) - q.daemon=True - q.start() - children.append(q.launcher) - - # Control Queue (in a Process) - if not control: - control = (ports.pop(),ports.pop()) - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') - q.bind_in(iface%control[0]) - q.bind_out(iface%control[1]) - q.connect_mon(iface%monport) - q.daemon=True - q.start() - children.append(q.launcher) - # Task Queue (in a Process) - if not task: - task = (ports.pop(),ports.pop()) - if args.scheduler == 'pure': - q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') - q.bind_in(iface%task[0]) - q.bind_out(iface%task[1]) - q.connect_mon(iface%monport) - q.daemon=True - q.start() - children.append(q.launcher) - else: - log_addr = iface%args.logport if args.logport else None - sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, - log_addr, args.loglevel, args.scheduler) - print (sargs) - q = Process(target=launch_scheduler, args=sargs) - q.daemon=True - q.start() - children.append(q) - - if args.mongodb: - from mongodb import MongoDB - db = MongoDB(thesession.session) - else: - db = DictDB() - time.sleep(.25) - - # build connection dicts - engine_addrs = { - 'control' : iface%control[1], - 'mux': iface%mux[1], - 'heartbeat': (iface%hb[0], iface%hb[1]), - 'task' : iface%task[1], - 'iopub' : iface%iopub[1], - 'monitor' : iface%monport, - } - - client_addrs = { - 'control' : iface%control[0], - 'query': iface%cport, - 'mux': iface%mux[0], - 'task' : iface%task[0], - 'iopub' : iface%iopub[0], - 'notification': iface%nport - } - - # setup logging - if args.logport: - connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) - else: - local_logger(args.loglevel) - - # register relay of signals to the children - signal_children(children) - hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, - registrar=reg, clientele=c, notifier=n, db=db, - engine_addrs=engine_addrs, client_addrs=client_addrs) - - dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) - dc.start() - try: - loop.start() - except KeyboardInterrupt: - print ("interrupted, exiting...", file=sys.__stderr__) - -if __name__ == '__main__': - main() diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index c11e902..41509d9 100755 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -6,7 +6,6 @@ connected to the Controller's queue(s). from __future__ import print_function import sys import time -import traceback import uuid import logging from pprint import pprint @@ -21,12 +20,9 @@ from IPython.utils.traitlets import Instance, Str, Dict, Int, Type from factory import RegistrationFactory -from streamsession import Message, StreamSession -from streamkernel import Kernel, make_kernel +from streamsession import Message +from streamkernel import Kernel import heartmonitor -from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, - local_logger) -# import taskthread def printer(*msg): # print (logging.handlers, file=sys.__stdout__) @@ -107,16 +103,15 @@ class EngineFactory(RegistrationFactory): # print (hb_addrs) # # Redirect input streams and set a display hook. - # if self.out_stream_factory: - # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') - # sys.stdout.topic = 'engine.%i.stdout'%self.id - # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') - # sys.stderr.topic = 'engine.%i.stderr'%self.id - # if self.display_hook_factory: - # sys.displayhook = self.display_hook_factory(self.session, iopub_stream) - # sys.displayhook.topic = 'engine.%i.pyout'%self.id + if self.out_stream_factory: + sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') + sys.stdout.topic = 'engine.%i.stdout'%self.id + sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') + sys.stderr.topic = 'engine.%i.stderr'%self.id + if self.display_hook_factory: + sys.displayhook = self.display_hook_factory(self.session, iopub_stream) + sys.displayhook.topic = 'engine.%i.pyout'%self.id - # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, @@ -124,6 +119,7 @@ class EngineFactory(RegistrationFactory): self.kernel.start() heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) + # ioloop.DelayedCallback(heart.start, 1000, self.loop).start() heart.start() @@ -143,48 +139,3 @@ class EngineFactory(RegistrationFactory): dc = ioloop.DelayedCallback(self.register, 0, self.loop) dc.start() - - -def main(argv=None, user_ns=None): - """DO NOT USE ME ANYMORE""" - parser = make_base_argument_parser() - - args = parser.parse_args(argv) - - parse_url(args) - - iface="%s://%s"%(args.transport,args.ip)+':%i' - - loop = ioloop.IOLoop.instance() - session = StreamSession(keyfile=args.execkey) - # print (session.key) - ctx = zmq.Context() - - # setup logging - - reg_conn = iface % args.regport - print (reg_conn, file=sys.__stdout__) - print ("Starting the engine...", file=sys.__stderr__) - - reg = ctx.socket(zmq.PAIR) - reg.connect(reg_conn) - reg = zmqstream.ZMQStream(reg, loop) - - e = Engine(context=ctx, loop=loop, session=session, registrar=reg, - ident=args.ident or '', user_ns=user_ns) - if args.logport: - print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) - connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) - else: - local_logger(args.loglevel) - - dc = ioloop.DelayedCallback(e.start, 0, loop) - dc.start() - try: - loop.start() - except KeyboardInterrupt: - print ("interrupted, exiting...", file=sys.__stderr__) - -# Execution as a script -if __name__ == '__main__': - main() diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index 1a9ba76..eb1c63a 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -28,15 +28,6 @@ from IPython.core.ultratb import FormattedTB from IPython.external.argparse import ArgumentParser from IPython.zmq.log import EnginePUBHandler -def split_ports(s, n): - """Parser helper for multiport strings""" - if not s: - return tuple([0]*n) - ports = map(int, s.split(',')) - if len(ports) != n: - raise ValueError - return ports - _random_ports = set() def select_random_ports(n): @@ -57,18 +48,6 @@ def select_random_ports(n): _random_ports.add(port) return ports -def parse_url(args): - """Ensure args.url contains full transport://interface:port""" - if args.url: - iface = args.url.split('://',1) - if len(args) == 2: - args.transport,iface = iface - iface = iface.split(':') - args.ip = iface[0] - if iface[1]: - args.regport = iface[1] - args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) - def signal_children(children): """Relay interupt/term signals to children, for more solid process cleanup.""" def terminate_children(sig, frame): @@ -90,35 +69,7 @@ def generate_exec_key(keyfile): # set user-only RW permissions (0600) # this will have no effect on Windows os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - - -def make_base_argument_parser(): - """ Creates an ArgumentParser for the generic arguments supported by all - ipcluster entry points. - """ - - parser = ArgumentParser() - parser.add_argument('--ip', type=str, default='127.0.0.1', - help='set the controller\'s IP address [default: local]') - parser.add_argument('--transport', type=str, default='tcp', - help='set the transport to use [default: tcp]') - parser.add_argument('--regport', type=int, metavar='PORT', default=10101, - help='set the XREP port for registration [default: 10101]') - parser.add_argument('--logport', type=int, metavar='PORT', default=0, - help='set the PUB port for remote logging [default: log to stdout]') - parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, - help='set the log level [default: INFO]') - parser.add_argument('--ident', type=str, - help='set the ZMQ identity [default: random]') - parser.add_argument('--packer', type=str, default='json', - choices=['json','pickle'], - help='set the message format method [default: json]') - parser.add_argument('--url', type=str, - help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') - parser.add_argument('--execkey', type=str, - help="File containing key for authenticating requests.") - return parser def integer_loglevel(loglevel): try: diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 622c75b..e215ece 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -158,26 +158,26 @@ class HubFactory(RegistrationFactory): subconstructors = List() _constructed = Bool(False) + def _ip_changed(self, name, old, new): + self.engine_ip = new + self.client_ip = new + self.monitor_ip = new + self._update_monitor_url() + def _update_monitor_url(self): self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) - def _sync_ips(self): - self.engine_ip = self.ip - self.client_ip = self.ip - self.monitor_ip = self.ip - self._update_monitor_url() - - def _sync_transports(self): - self.engine_transport = self.transport - self.client_transport = self.transport - self.monitor_transport = self.transport + def _transport_changed(self, name, old, new): + self.engine_transport = new + self.client_transport = new + self.monitor_transport = new self._update_monitor_url() def __init__(self, **kwargs): super(HubFactory, self).__init__(**kwargs) self._update_monitor_url() - self.on_trait_change(self._sync_ips, 'ip') - self.on_trait_change(self._sync_transports, 'transport') + # self.on_trait_change(self._sync_ips, 'ip') + # self.on_trait_change(self._sync_transports, 'transport') self.subconstructors.append(self.construct_hub) @@ -334,45 +334,11 @@ class Hub(HasTraits): """ super(Hub, self).__init__(**kwargs) - self.ids = set() - self.pending = set() - # self.keytable={} - # self.incoming_registrations={} - # self.engines = {} - # self.by_ident = {} - # self.clients = {} - # self.hearts = {} - # self.mia = set() self.registration_timeout = max(5000, 2*self.heartmonitor.period) - # this is the stuff that will move to DB: - # self.pending = set() # pending messages, keyed by msg_id - # self.queues = {} # pending msg_ids keyed by engine_id - # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id - # self.completed = {} # completed msg_ids keyed by engine_id - # self.all_completed = set() - # self._idcounter = 0 - # self.sockets = {} - # self.loop = loop - # self.session = session - # self.registrar = registrar - # self.clientele = clientele - # self.queue = queue - # self.heartmonitor = heartbeat - # self.notifier = notifier - # self.db = db # validate connection dicts: - # self.client_addrs = client_addrs validate_url_container(self.client_addrs) - - # assert isinstance(self.client_addrs['queue'], str) - # assert isinstance(self.client_addrs['control'], str) - # self.hb_addrs = hb_addrs validate_url_container(self.engine_addrs) - # self.engine_addrs = engine_addrs - # assert isinstance(self.engine_addrs['queue'], str) - # assert isinstance(self.engine_addrs['control'], str) - # assert len(engine_addrs['heartbeat']) == 2 # register our callbacks self.registrar.on_recv(self.dispatch_register_request) @@ -409,7 +375,9 @@ class Hub(HasTraits): @property def _next_id(self): - """gemerate a new ID""" + """gemerate a new ID. + + No longer reuse old ids, just count from 0.""" newid = self._idcounter self._idcounter += 1 return newid diff --git a/IPython/zmq/parallel/ipclusterapp.py b/IPython/zmq/parallel/ipclusterapp.py new file mode 100755 index 0000000..a311ef9 --- /dev/null +++ b/IPython/zmq/parallel/ipclusterapp.py @@ -0,0 +1,502 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The ipcluster application. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import logging +import os +import signal +import logging + +from zmq.eventloop import ioloop + +from IPython.external.argparse import ArgumentParser, SUPPRESS +from IPython.utils.importstring import import_item +from IPython.zmq.parallel.clusterdir import ( + ApplicationWithClusterDir, ClusterDirConfigLoader, + ClusterDirError, PIDFileError +) + + +#----------------------------------------------------------------------------- +# Module level variables +#----------------------------------------------------------------------------- + + +default_config_file_name = u'ipcluster_config.py' + + +_description = """\ +Start an IPython cluster for parallel computing.\n\n + +An IPython cluster consists of 1 controller and 1 or more engines. +This command automates the startup of these processes using a wide +range of startup methods (SSH, local processes, PBS, mpiexec, +Windows HPC Server 2008). To start a cluster with 4 engines on your +local host simply do 'ipcluster start -n 4'. For more complex usage +you will typically do 'ipcluster create -p mycluster', then edit +configuration files, followed by 'ipcluster start -p mycluster -n 4'. +""" + + +# Exit codes for ipcluster + +# This will be the exit code if the ipcluster appears to be running because +# a .pid file exists +ALREADY_STARTED = 10 + + +# This will be the exit code if ipcluster stop is run, but there is not .pid +# file to be found. +ALREADY_STOPPED = 11 + + +#----------------------------------------------------------------------------- +# Command line options +#----------------------------------------------------------------------------- + + +class IPClusterAppConfigLoader(ClusterDirConfigLoader): + + def _add_arguments(self): + # Don't call ClusterDirConfigLoader._add_arguments as we don't want + # its defaults on self.parser. Instead, we will put those on + # default options on our subparsers. + + # This has all the common options that all subcommands use + parent_parser1 = ArgumentParser( + add_help=False, + argument_default=SUPPRESS + ) + self._add_ipython_dir(parent_parser1) + self._add_log_level(parent_parser1) + + # This has all the common options that other subcommands use + parent_parser2 = ArgumentParser( + add_help=False, + argument_default=SUPPRESS + ) + self._add_cluster_profile(parent_parser2) + self._add_cluster_dir(parent_parser2) + self._add_work_dir(parent_parser2) + paa = parent_parser2.add_argument + paa('--log-to-file', + action='store_true', dest='Global.log_to_file', + help='Log to a file in the log directory (default is stdout)') + + # Create the object used to create the subparsers. + subparsers = self.parser.add_subparsers( + dest='Global.subcommand', + title='ipcluster subcommands', + description= + """ipcluster has a variety of subcommands. The general way of + running ipcluster is 'ipcluster [options]'. To get help + on a particular subcommand do 'ipcluster -h'.""" + # help="For more help, type 'ipcluster -h'", + ) + + # The "list" subcommand parser + parser_list = subparsers.add_parser( + 'list', + parents=[parent_parser1], + argument_default=SUPPRESS, + help="List all clusters in cwd and ipython_dir.", + description= + """List all available clusters, by cluster directory, that can + be found in the current working directly or in the ipython + directory. Cluster directories are named using the convention + 'cluster_'.""" + ) + + # The "create" subcommand parser + parser_create = subparsers.add_parser( + 'create', + parents=[parent_parser1, parent_parser2], + argument_default=SUPPRESS, + help="Create a new cluster directory.", + description= + """Create an ipython cluster directory by its profile name or + cluster directory path. Cluster directories contain + configuration, log and security related files and are named + using the convention 'cluster_'. By default they are + located in your ipython directory. Once created, you will + probably need to edit the configuration files in the cluster + directory to configure your cluster. Most users will create a + cluster directory by profile name, + 'ipcluster create -p mycluster', which will put the directory + in '/cluster_mycluster'. + """ + ) + paa = parser_create.add_argument + paa('--reset-config', + dest='Global.reset_config', action='store_true', + help= + """Recopy the default config files to the cluster directory. + You will loose any modifications you have made to these files.""") + + # The "start" subcommand parser + parser_start = subparsers.add_parser( + 'start', + parents=[parent_parser1, parent_parser2], + argument_default=SUPPRESS, + help="Start a cluster.", + description= + """Start an ipython cluster by its profile name or cluster + directory. Cluster directories contain configuration, log and + security related files and are named using the convention + 'cluster_' and should be creating using the 'start' + subcommand of 'ipcluster'. If your cluster directory is in + the cwd or the ipython directory, you can simply refer to it + using its profile name, 'ipcluster start -n 4 -p `, + otherwise use the '--cluster-dir' option. + """ + ) + paa = parser_start.add_argument + paa('-n', '--number', + type=int, dest='Global.n', + help='The number of engines to start.', + metavar='Global.n') + paa('--clean-logs', + dest='Global.clean_logs', action='store_true', + help='Delete old log flies before starting.') + paa('--no-clean-logs', + dest='Global.clean_logs', action='store_false', + help="Don't delete old log flies before starting.") + paa('--daemon', + dest='Global.daemonize', action='store_true', + help='Daemonize the ipcluster program. This implies --log-to-file') + paa('--no-daemon', + dest='Global.daemonize', action='store_false', + help="Dont't daemonize the ipcluster program.") + + # The "stop" subcommand parser + parser_stop = subparsers.add_parser( + 'stop', + parents=[parent_parser1, parent_parser2], + argument_default=SUPPRESS, + help="Stop a running cluster.", + description= + """Stop a running ipython cluster by its profile name or cluster + directory. Cluster directories are named using the convention + 'cluster_'. If your cluster directory is in + the cwd or the ipython directory, you can simply refer to it + using its profile name, 'ipcluster stop -p `, otherwise + use the '--cluster-dir' option. + """ + ) + paa = parser_stop.add_argument + paa('--signal', + dest='Global.signal', type=int, + help="The signal number to use in stopping the cluster (default=2).", + metavar="Global.signal") + + +#----------------------------------------------------------------------------- +# Main application +#----------------------------------------------------------------------------- + + +class IPClusterApp(ApplicationWithClusterDir): + + name = u'ipclusterz' + description = _description + usage = None + command_line_loader = IPClusterAppConfigLoader + default_config_file_name = default_config_file_name + default_log_level = logging.INFO + auto_create_cluster_dir = False + + def create_default_config(self): + super(IPClusterApp, self).create_default_config() + self.default_config.Global.controller_launcher = \ + 'IPython.zmq.parallel.launcher.LocalControllerLauncher' + self.default_config.Global.engine_launcher = \ + 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher' + self.default_config.Global.n = 2 + self.default_config.Global.reset_config = False + self.default_config.Global.clean_logs = True + self.default_config.Global.signal = 2 + self.default_config.Global.daemonize = False + + def find_resources(self): + subcommand = self.command_line_config.Global.subcommand + if subcommand=='list': + self.list_cluster_dirs() + # Exit immediately because there is nothing left to do. + self.exit() + elif subcommand=='create': + self.auto_create_cluster_dir = True + super(IPClusterApp, self).find_resources() + elif subcommand=='start' or subcommand=='stop': + self.auto_create_cluster_dir = True + try: + super(IPClusterApp, self).find_resources() + except ClusterDirError: + raise ClusterDirError( + "Could not find a cluster directory. A cluster dir must " + "be created before running 'ipcluster start'. Do " + "'ipcluster create -h' or 'ipcluster list -h' for more " + "information about creating and listing cluster dirs." + ) + + def list_cluster_dirs(self): + # Find the search paths + cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') + if cluster_dir_paths: + cluster_dir_paths = cluster_dir_paths.split(':') + else: + cluster_dir_paths = [] + try: + ipython_dir = self.command_line_config.Global.ipython_dir + except AttributeError: + ipython_dir = self.default_config.Global.ipython_dir + paths = [os.getcwd(), ipython_dir] + \ + cluster_dir_paths + paths = list(set(paths)) + + self.log.info('Searching for cluster dirs in paths: %r' % paths) + for path in paths: + files = os.listdir(path) + for f in files: + full_path = os.path.join(path, f) + if os.path.isdir(full_path) and f.startswith('cluster_'): + profile = full_path.split('_')[-1] + start_cmd = 'ipcluster start -p %s -n 4' % profile + print start_cmd + " ==> " + full_path + + def pre_construct(self): + # IPClusterApp.pre_construct() is where we cd to the working directory. + super(IPClusterApp, self).pre_construct() + config = self.master_config + try: + daemon = config.Global.daemonize + if daemon: + config.Global.log_to_file = True + except AttributeError: + pass + + def construct(self): + config = self.master_config + subcmd = config.Global.subcommand + reset = config.Global.reset_config + if subcmd == 'list': + return + if subcmd == 'create': + self.log.info('Copying default config files to cluster directory ' + '[overwrite=%r]' % (reset,)) + self.cluster_dir_obj.copy_all_config_files(overwrite=reset) + if subcmd =='start': + self.cluster_dir_obj.copy_all_config_files(overwrite=False) + self.start_logging() + self.loop = ioloop.IOLoop.instance() + # reactor.callWhenRunning(self.start_launchers) + dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) + dc.start() + + def start_launchers(self): + config = self.master_config + + # Create the launchers. In both bases, we set the work_dir of + # the launcher to the cluster_dir. This is where the launcher's + # subprocesses will be launched. It is not where the controller + # and engine will be launched. + el_class = import_item(config.Global.engine_launcher) + self.engine_launcher = el_class( + work_dir=self.cluster_dir, config=config + ) + cl_class = import_item(config.Global.controller_launcher) + self.controller_launcher = cl_class( + work_dir=self.cluster_dir, config=config + ) + + # Setup signals + signal.signal(signal.SIGINT, self.sigint_handler) + + # Setup the observing of stopping. If the controller dies, shut + # everything down as that will be completely fatal for the engines. + self.controller_launcher.on_stop(self.stop_launchers) + # d1.addCallback(self.stop_launchers) + # But, we don't monitor the stopping of engines. An engine dying + # is just fine and in principle a user could start a new engine. + # Also, if we did monitor engine stopping, it is difficult to + # know what to do when only some engines die. Currently, the + # observing of engine stopping is inconsistent. Some launchers + # might trigger on a single engine stopping, other wait until + # all stop. TODO: think more about how to handle this. + + # Start the controller and engines + self._stopping = False # Make sure stop_launchers is not called 2x. + d = self.start_controller() + self.start_engines() + self.startup_message() + # d.addCallback(self.start_engines) + # d.addCallback(self.startup_message) + # If the controller or engines fail to start, stop everything + # d.addErrback(self.stop_launchers) + return d + + def startup_message(self, r=None): + logging.info("IPython cluster: started") + return r + + def start_controller(self, r=None): + # logging.info("In start_controller") + config = self.master_config + d = self.controller_launcher.start( + cluster_dir=config.Global.cluster_dir + ) + return d + + def start_engines(self, r=None): + # logging.info("In start_engines") + config = self.master_config + d = self.engine_launcher.start( + config.Global.n, + cluster_dir=config.Global.cluster_dir + ) + return d + + def stop_controller(self, r=None): + # logging.info("In stop_controller") + if self.controller_launcher.running: + return self.controller_launcher.stop() + + def stop_engines(self, r=None): + # logging.info("In stop_engines") + if self.engine_launcher.running: + d = self.engine_launcher.stop() + # d.addErrback(self.log_err) + return d + else: + return None + + def log_err(self, f): + logging.error(f.getTraceback()) + return None + + def stop_launchers(self, r=None): + if not self._stopping: + self._stopping = True + # if isinstance(r, failure.Failure): + # logging.error('Unexpected error in ipcluster:') + # logging.info(r.getTraceback()) + logging.error("IPython cluster: stopping") + # These return deferreds. We are not doing anything with them + # but we are holding refs to them as a reminder that they + # do return deferreds. + d1 = self.stop_engines() + d2 = self.stop_controller() + # Wait a few seconds to let things shut down. + dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) + dc.start() + # reactor.callLater(4.0, reactor.stop) + + def sigint_handler(self, signum, frame): + self.stop_launchers() + + def start_logging(self): + # Remove old log files of the controller and engine + if self.master_config.Global.clean_logs: + log_dir = self.master_config.Global.log_dir + for f in os.listdir(log_dir): + if f.startswith('ipengine' + '-'): + if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): + os.remove(os.path.join(log_dir, f)) + if f.startswith('ipcontroller' + '-'): + if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): + os.remove(os.path.join(log_dir, f)) + # This will remote old log files for ipcluster itself + super(IPClusterApp, self).start_logging() + + def start_app(self): + """Start the application, depending on what subcommand is used.""" + subcmd = self.master_config.Global.subcommand + if subcmd=='create' or subcmd=='list': + return + elif subcmd=='start': + self.start_app_start() + elif subcmd=='stop': + self.start_app_stop() + + def start_app_start(self): + """Start the app for the start subcommand.""" + config = self.master_config + # First see if the cluster is already running + try: + pid = self.get_pid_from_file() + except PIDFileError: + pass + else: + self.log.critical( + 'Cluster is already running with [pid=%s]. ' + 'use "ipcluster stop" to stop the cluster.' % pid + ) + # Here I exit with a unusual exit status that other processes + # can watch for to learn how I existed. + self.exit(ALREADY_STARTED) + + # Now log and daemonize + self.log.info( + 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize + ) + # TODO: Get daemonize working on Windows or as a Windows Server. + if config.Global.daemonize: + if os.name=='posix': + from twisted.scripts._twistd_unix import daemonize + daemonize() + + # Now write the new pid file AFTER our new forked pid is active. + self.write_pid_file() + try: + self.loop.start() + except: + logging.info("stopping...") + self.remove_pid_file() + + def start_app_stop(self): + """Start the app for the stop subcommand.""" + config = self.master_config + try: + pid = self.get_pid_from_file() + except PIDFileError: + self.log.critical( + 'Problem reading pid file, cluster is probably not running.' + ) + # Here I exit with a unusual exit status that other processes + # can watch for to learn how I existed. + self.exit(ALREADY_STOPPED) + else: + if os.name=='posix': + sig = config.Global.signal + self.log.info( + "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) + ) + os.kill(pid, sig) + elif os.name=='nt': + # As of right now, we don't support daemonize on Windows, so + # stop will not do anything. Minimally, it should clean up the + # old .pid files. + self.remove_pid_file() + + +def launch_new_instance(): + """Create and run the IPython cluster.""" + app = IPClusterApp() + app.start() + + +if __name__ == '__main__': + launch_new_instance() + diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py index b4cce50..1a65825 100755 --- a/IPython/zmq/parallel/ipcontrollerapp.py +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -123,6 +123,11 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' 'connections [default: random]', metavar='Hub.hb_ports') + paa('--ping', + type=int, dest='HubFactory.ping', + help='The frequency at which the Hub pings the engines for heartbeats ' + ' (in ms) [default: 100]', + metavar='Hub.ping') # Client config paa('--client-ip', @@ -204,10 +209,10 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): help='Try to reuse existing execution keys.') paa('--no-secure', action='store_false', dest='Global.secure', - help='Turn off execution keys.') + help='Turn off execution keys (default).') paa('--secure', action='store_true', dest='Global.secure', - help='Turn on execution keys (default).') + help='Turn on execution keys.') paa('--execkey', type=str, dest='Global.exec_key', help='path to a file containing an execution key.', @@ -280,6 +285,19 @@ class IPControllerApp(ApplicationWithClusterDir): except: self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) + + def save_urls(self): + """save the registration urls to files.""" + c = self.master_config + + sec_dir = c.Global.security_dir + cf = self.factory + + with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: + f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) + + with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: + f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) def import_statements(self): @@ -291,19 +309,19 @@ class IPControllerApp(ApplicationWithClusterDir): except: self.log.msg("Error running statement: %s" % s) - # def start_logging(self): - # super(IPControllerApp, self).start_logging() - # if self.master_config.Global.log_url: - # context = self.factory.context - # lsock = context.socket(zmq.PUB) - # lsock.connect(self.master_config.Global.log_url) - # handler = PUBHandler(lsock) - # handler.root_topic = 'controller' - # handler.setLevel(self.log_level) - # self.log.addHandler(handler) + def start_logging(self): + super(IPControllerApp, self).start_logging() + if self.master_config.Global.log_url: + context = self.factory.context + lsock = context.socket(zmq.PUB) + lsock.connect(self.master_config.Global.log_url) + handler = PUBHandler(lsock) + handler.root_topic = 'controller' + handler.setLevel(self.log_level) + self.log.addHandler(handler) # def start_app(self): - # Start the controller service. + # Start the subprocesses: self.factory.start() self.write_pid_file(overwrite=True) try: diff --git a/IPython/zmq/parallel/launcher.py b/IPython/zmq/parallel/launcher.py new file mode 100644 index 0000000..1b05ee9 --- /dev/null +++ b/IPython/zmq/parallel/launcher.py @@ -0,0 +1,824 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +Facilities for launching IPython processes asynchronously. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import os +import re +import sys +import logging + +from signal import SIGINT +try: + from signal import SIGKILL +except ImportError: + SIGKILL=SIGTERM + +from subprocess import Popen, PIPE + +from zmq.eventloop import ioloop + +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import Str, Int, List, Unicode, Instance +from IPython.utils.path import get_ipython_module_path +from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError + +# from IPython.kernel.winhpcjob import ( +# IPControllerTask, IPEngineTask, +# IPControllerJob, IPEngineSetJob +# ) + + +#----------------------------------------------------------------------------- +# Paths to the kernel apps +#----------------------------------------------------------------------------- + + +ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( + 'IPython.zmq.parallel.ipclusterapp' +)) + +ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( + 'IPython.zmq.parallel.ipengineapp' +)) + +ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( + 'IPython.zmq.parallel.ipcontrollerapp' +)) + +#----------------------------------------------------------------------------- +# Base launchers and errors +#----------------------------------------------------------------------------- + + +class LauncherError(Exception): + pass + + +class ProcessStateError(LauncherError): + pass + + +class UnknownStatus(LauncherError): + pass + + +class BaseLauncher(Configurable): + """An asbtraction for starting, stopping and signaling a process.""" + + # In all of the launchers, the work_dir is where child processes will be + # run. This will usually be the cluster_dir, but may not be. any work_dir + # passed into the __init__ method will override the config value. + # This should not be used to set the work_dir for the actual engine + # and controller. Instead, use their own config files or the + # controller_args, engine_args attributes of the launchers to add + # the --work-dir option. + work_dir = Unicode(u'.') + loop = Instance('zmq.eventloop.ioloop.IOLoop') + def _loop_default(self): + return ioloop.IOLoop.instance() + + def __init__(self, work_dir=u'.', config=None): + super(BaseLauncher, self).__init__(work_dir=work_dir, config=config) + self.state = 'before' # can be before, running, after + self.stop_callbacks = [] + self.start_data = None + self.stop_data = None + + @property + def args(self): + """A list of cmd and args that will be used to start the process. + + This is what is passed to :func:`spawnProcess` and the first element + will be the process name. + """ + return self.find_args() + + def find_args(self): + """The ``.args`` property calls this to find the args list. + + Subcommand should implement this to construct the cmd and args. + """ + raise NotImplementedError('find_args must be implemented in a subclass') + + @property + def arg_str(self): + """The string form of the program arguments.""" + return ' '.join(self.args) + + @property + def running(self): + """Am I running.""" + if self.state == 'running': + return True + else: + return False + + def start(self): + """Start the process. + + This must return a deferred that fires with information about the + process starting (like a pid, job id, etc.). + """ + raise NotImplementedError('start must be implemented in a subclass') + + def stop(self): + """Stop the process and notify observers of stopping. + + This must return a deferred that fires with information about the + processing stopping, like errors that occur while the process is + attempting to be shut down. This deferred won't fire when the process + actually stops. To observe the actual process stopping, see + :func:`observe_stop`. + """ + raise NotImplementedError('stop must be implemented in a subclass') + + def on_stop(self, f): + """Get a deferred that will fire when the process stops. + + The deferred will fire with data that contains information about + the exit status of the process. + """ + if self.state=='after': + return f(self.stop_data) + else: + self.stop_callbacks.append(f) + + def notify_start(self, data): + """Call this to trigger startup actions. + + This logs the process startup and sets the state to 'running'. It is + a pass-through so it can be used as a callback. + """ + + logging.info('Process %r started: %r' % (self.args[0], data)) + self.start_data = data + self.state = 'running' + return data + + def notify_stop(self, data): + """Call this to trigger process stop actions. + + This logs the process stopping and sets the state to 'after'. Call + this to trigger all the deferreds from :func:`observe_stop`.""" + + logging.info('Process %r stopped: %r' % (self.args[0], data)) + self.stop_data = data + self.state = 'after' + for i in range(len(self.stop_callbacks)): + d = self.stop_callbacks.pop() + d(data) + return data + + def signal(self, sig): + """Signal the process. + + Return a semi-meaningless deferred after signaling the process. + + Parameters + ---------- + sig : str or int + 'KILL', 'INT', etc., or any signal number + """ + raise NotImplementedError('signal must be implemented in a subclass') + + +#----------------------------------------------------------------------------- +# Local process launchers +#----------------------------------------------------------------------------- + + +class LocalProcessLauncher(BaseLauncher): + """Start and stop an external process in an asynchronous manner. + + This will launch the external process with a working directory of + ``self.work_dir``. + """ + + # This is used to to construct self.args, which is passed to + # spawnProcess. + cmd_and_args = List([]) + poll_frequency = Int(100) # in ms + + def __init__(self, work_dir=u'.', config=None): + super(LocalProcessLauncher, self).__init__( + work_dir=work_dir, config=config + ) + self.process = None + self.start_deferred = None + self.poller = None + + def find_args(self): + return self.cmd_and_args + + def start(self): + if self.state == 'before': + self.process = Popen(self.args, + stdout=PIPE,stderr=PIPE,stdin=PIPE, + env=os.environ, + cwd=self.work_dir + ) + + self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) + self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) + self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) + self.poller.start() + self.notify_start(self.process.pid) + else: + s = 'The process was already started and has state: %r' % self.state + raise ProcessStateError(s) + + def stop(self): + return self.interrupt_then_kill() + + def signal(self, sig): + if self.state == 'running': + self.process.send_signal(sig) + + def interrupt_then_kill(self, delay=2.0): + """Send INT, wait a delay and then send KILL.""" + self.signal(SIGINT) + self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop) + self.killer.start() + + # callbacks, etc: + + def handle_stdout(self, fd, events): + line = self.process.stdout.readline() + # a stopped process will be readable but return empty strings + if line: + logging.info(line[:-1]) + else: + self.poll() + + def handle_stderr(self, fd, events): + line = self.process.stderr.readline() + # a stopped process will be readable but return empty strings + if line: + logging.error(line[:-1]) + else: + self.poll() + + def poll(self): + status = self.process.poll() + if status is not None: + self.poller.stop() + self.loop.remove_handler(self.process.stdout.fileno()) + self.loop.remove_handler(self.process.stderr.fileno()) + self.notify_stop(dict(exit_code=status, pid=self.process.pid)) + return status + +class LocalControllerLauncher(LocalProcessLauncher): + """Launch a controller as a regular external process.""" + + controller_cmd = List(ipcontroller_cmd_argv, config=True) + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) + + def find_args(self): + return self.controller_cmd + self.controller_args + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + logging.info("Starting LocalControllerLauncher: %r" % self.args) + return super(LocalControllerLauncher, self).start() + + +class LocalEngineLauncher(LocalProcessLauncher): + """Launch a single engine as a regular externall process.""" + + engine_cmd = List(ipengine_cmd_argv, config=True) + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', str(logging.ERROR)], config=True + ) + + def find_args(self): + return self.engine_cmd + self.engine_args + + def start(self, cluster_dir): + """Start the engine by cluster_dir.""" + self.engine_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + return super(LocalEngineLauncher, self).start() + + +class LocalEngineSetLauncher(BaseLauncher): + """Launch a set of engines as regular external processes.""" + + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', str(logging.ERROR)], config=True + ) + # launcher class + launcher_class = LocalEngineLauncher + + def __init__(self, work_dir=u'.', config=None): + super(LocalEngineSetLauncher, self).__init__( + work_dir=work_dir, config=config + ) + self.launchers = {} + self.stop_data = {} + + def start(self, n, cluster_dir): + """Start n engines by profile or cluster_dir.""" + self.cluster_dir = unicode(cluster_dir) + dlist = [] + for i in range(n): + el = self.launcher_class(work_dir=self.work_dir, config=self.config) + # Copy the engine args over to each engine launcher. + import copy + el.engine_args = copy.deepcopy(self.engine_args) + el.on_stop(self._notice_engine_stopped) + d = el.start(cluster_dir) + if i==0: + logging.info("Starting LocalEngineSetLauncher: %r" % el.args) + self.launchers[i] = el + dlist.append(d) + self.notify_start(dlist) + # The consumeErrors here could be dangerous + # dfinal = gatherBoth(dlist, consumeErrors=True) + # dfinal.addCallback(self.notify_start) + return dlist + + def find_args(self): + return ['engine set'] + + def signal(self, sig): + dlist = [] + for el in self.launchers.itervalues(): + d = el.signal(sig) + dlist.append(d) + # dfinal = gatherBoth(dlist, consumeErrors=True) + return dlist + + def interrupt_then_kill(self, delay=1.0): + dlist = [] + for el in self.launchers.itervalues(): + d = el.interrupt_then_kill(delay) + dlist.append(d) + # dfinal = gatherBoth(dlist, consumeErrors=True) + return dlist + + def stop(self): + return self.interrupt_then_kill() + + def _notice_engine_stopped(self, data): + print "notice", data + pid = data['pid'] + for idx,el in self.launchers.iteritems(): + if el.process.pid == pid: + break + self.launchers.pop(idx) + self.stop_data[idx] = data + if not self.launchers: + self.notify_stop(self.stop_data) + + +#----------------------------------------------------------------------------- +# MPIExec launchers +#----------------------------------------------------------------------------- + + +class MPIExecLauncher(LocalProcessLauncher): + """Launch an external process using mpiexec.""" + + # The mpiexec command to use in starting the process. + mpi_cmd = List(['mpiexec'], config=True) + # The command line arguments to pass to mpiexec. + mpi_args = List([], config=True) + # The program to start using mpiexec. + program = List(['date'], config=True) + # The command line argument to the program. + program_args = List([], config=True) + # The number of instances of the program to start. + n = Int(1, config=True) + + def find_args(self): + """Build self.args using all the fields.""" + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.program + self.program_args + + def start(self, n): + """Start n instances of the program using mpiexec.""" + self.n = n + return super(MPIExecLauncher, self).start() + + +class MPIExecControllerLauncher(MPIExecLauncher): + """Launch a controller using mpiexec.""" + + controller_cmd = List(ipcontroller_cmd_argv, config=True) + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) + n = Int(1, config=False) + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + logging.info("Starting MPIExecControllerLauncher: %r" % self.args) + return super(MPIExecControllerLauncher, self).start(1) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.controller_cmd + self.controller_args + + +class MPIExecEngineSetLauncher(MPIExecLauncher): + + engine_cmd = List(ipengine_cmd_argv, config=True) + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', str(logging.ERROR)], config=True + ) + n = Int(1, config=True) + + def start(self, n, cluster_dir): + """Start n engines by profile or cluster_dir.""" + self.engine_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + self.n = n + logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args) + return super(MPIExecEngineSetLauncher, self).start(n) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.engine_cmd + self.engine_args + + +#----------------------------------------------------------------------------- +# SSH launchers +#----------------------------------------------------------------------------- + +# TODO: Get SSH Launcher working again. + +class SSHLauncher(LocalProcessLauncher): + """A minimal launcher for ssh. + + To be useful this will probably have to be extended to use the ``sshx`` + idea for environment variables. There could be other things this needs + as well. + """ + + ssh_cmd = List(['ssh'], config=True) + ssh_args = List([], config=True) + program = List(['date'], config=True) + program_args = List([], config=True) + hostname = Str('', config=True) + user = Str(os.environ.get('USER','username'), config=True) + location = Str('') + + def _hostname_changed(self, name, old, new): + self.location = '%s@%s' % (self.user, new) + + def _user_changed(self, name, old, new): + self.location = '%s@%s' % (new, self.hostname) + + def find_args(self): + return self.ssh_cmd + self.ssh_args + [self.location] + \ + self.program + self.program_args + + def start(self, cluster_dir, hostname=None, user=None): + if hostname is not None: + self.hostname = hostname + if user is not None: + self.user = user + return super(SSHLauncher, self).start() + + +class SSHControllerLauncher(SSHLauncher): + + program = List(ipcontroller_cmd_argv, config=True) + # Command line arguments to ipcontroller. + program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) + + +class SSHEngineLauncher(SSHLauncher): + program = List(ipengine_cmd_argv, config=True) + # Command line arguments for ipengine. + program_args = List( + ['--log-to-file','--log-level', str(logging.ERROR)], config=True + ) + +class SSHEngineSetLauncher(LocalEngineSetLauncher): + launcher_class = SSHEngineLauncher + + +#----------------------------------------------------------------------------- +# Windows HPC Server 2008 scheduler launchers +#----------------------------------------------------------------------------- + + +# # This is only used on Windows. +# def find_job_cmd(): +# if os.name=='nt': +# try: +# return find_cmd('job') +# except FindCmdError: +# return 'job' +# else: +# return 'job' +# +# +# class WindowsHPCLauncher(BaseLauncher): +# +# # A regular expression used to get the job id from the output of the +# # submit_command. +# job_id_regexp = Str(r'\d+', config=True) +# # The filename of the instantiated job script. +# job_file_name = Unicode(u'ipython_job.xml', config=True) +# # The full path to the instantiated job script. This gets made dynamically +# # by combining the work_dir with the job_file_name. +# job_file = Unicode(u'') +# # The hostname of the scheduler to submit the job to +# scheduler = Str('', config=True) +# job_cmd = Str(find_job_cmd(), config=True) +# +# def __init__(self, work_dir=u'.', config=None): +# super(WindowsHPCLauncher, self).__init__( +# work_dir=work_dir, config=config +# ) +# +# @property +# def job_file(self): +# return os.path.join(self.work_dir, self.job_file_name) +# +# def write_job_file(self, n): +# raise NotImplementedError("Implement write_job_file in a subclass.") +# +# def find_args(self): +# return ['job.exe'] +# +# def parse_job_id(self, output): +# """Take the output of the submit command and return the job id.""" +# m = re.search(self.job_id_regexp, output) +# if m is not None: +# job_id = m.group() +# else: +# raise LauncherError("Job id couldn't be determined: %s" % output) +# self.job_id = job_id +# logging.info('Job started with job id: %r' % job_id) +# return job_id +# +# @inlineCallbacks +# def start(self, n): +# """Start n copies of the process using the Win HPC job scheduler.""" +# self.write_job_file(n) +# args = [ +# 'submit', +# '/jobfile:%s' % self.job_file, +# '/scheduler:%s' % self.scheduler +# ] +# logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) +# # Twisted will raise DeprecationWarnings if we try to pass unicode to this +# output = yield getProcessOutput(str(self.job_cmd), +# [str(a) for a in args], +# env=dict((str(k),str(v)) for k,v in os.environ.items()), +# path=self.work_dir +# ) +# job_id = self.parse_job_id(output) +# self.notify_start(job_id) +# defer.returnValue(job_id) +# +# @inlineCallbacks +# def stop(self): +# args = [ +# 'cancel', +# self.job_id, +# '/scheduler:%s' % self.scheduler +# ] +# logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) +# try: +# # Twisted will raise DeprecationWarnings if we try to pass unicode to this +# output = yield getProcessOutput(str(self.job_cmd), +# [str(a) for a in args], +# env=dict((str(k),str(v)) for k,v in os.environ.iteritems()), +# path=self.work_dir +# ) +# except: +# output = 'The job already appears to be stoppped: %r' % self.job_id +# self.notify_stop(output) # Pass the output of the kill cmd +# defer.returnValue(output) +# +# +# class WindowsHPCControllerLauncher(WindowsHPCLauncher): +# +# job_file_name = Unicode(u'ipcontroller_job.xml', config=True) +# extra_args = List([], config=False) +# +# def write_job_file(self, n): +# job = IPControllerJob(config=self.config) +# +# t = IPControllerTask(config=self.config) +# # The tasks work directory is *not* the actual work directory of +# # the controller. It is used as the base path for the stdout/stderr +# # files that the scheduler redirects to. +# t.work_directory = self.cluster_dir +# # Add the --cluster-dir and from self.start(). +# t.controller_args.extend(self.extra_args) +# job.add_task(t) +# +# logging.info("Writing job description file: %s" % self.job_file) +# job.write(self.job_file) +# +# @property +# def job_file(self): +# return os.path.join(self.cluster_dir, self.job_file_name) +# +# def start(self, cluster_dir): +# """Start the controller by cluster_dir.""" +# self.extra_args = ['--cluster-dir', cluster_dir] +# self.cluster_dir = unicode(cluster_dir) +# return super(WindowsHPCControllerLauncher, self).start(1) +# +# +# class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): +# +# job_file_name = Unicode(u'ipengineset_job.xml', config=True) +# extra_args = List([], config=False) +# +# def write_job_file(self, n): +# job = IPEngineSetJob(config=self.config) +# +# for i in range(n): +# t = IPEngineTask(config=self.config) +# # The tasks work directory is *not* the actual work directory of +# # the engine. It is used as the base path for the stdout/stderr +# # files that the scheduler redirects to. +# t.work_directory = self.cluster_dir +# # Add the --cluster-dir and from self.start(). +# t.engine_args.extend(self.extra_args) +# job.add_task(t) +# +# logging.info("Writing job description file: %s" % self.job_file) +# job.write(self.job_file) +# +# @property +# def job_file(self): +# return os.path.join(self.cluster_dir, self.job_file_name) +# +# def start(self, n, cluster_dir): +# """Start the controller by cluster_dir.""" +# self.extra_args = ['--cluster-dir', cluster_dir] +# self.cluster_dir = unicode(cluster_dir) +# return super(WindowsHPCEngineSetLauncher, self).start(n) +# +# +# #----------------------------------------------------------------------------- +# # Batch (PBS) system launchers +# #----------------------------------------------------------------------------- +# +# # TODO: Get PBS launcher working again. +# +# class BatchSystemLauncher(BaseLauncher): +# """Launch an external process using a batch system. +# +# This class is designed to work with UNIX batch systems like PBS, LSF, +# GridEngine, etc. The overall model is that there are different commands +# like qsub, qdel, etc. that handle the starting and stopping of the process. +# +# This class also has the notion of a batch script. The ``batch_template`` +# attribute can be set to a string that is a template for the batch script. +# This template is instantiated using Itpl. Thus the template can use +# ${n} fot the number of instances. Subclasses can add additional variables +# to the template dict. +# """ +# +# # Subclasses must fill these in. See PBSEngineSet +# # The name of the command line program used to submit jobs. +# submit_command = Str('', config=True) +# # The name of the command line program used to delete jobs. +# delete_command = Str('', config=True) +# # A regular expression used to get the job id from the output of the +# # submit_command. +# job_id_regexp = Str('', config=True) +# # The string that is the batch script template itself. +# batch_template = Str('', config=True) +# # The filename of the instantiated batch script. +# batch_file_name = Unicode(u'batch_script', config=True) +# # The full path to the instantiated batch script. +# batch_file = Unicode(u'') +# +# def __init__(self, work_dir=u'.', config=None): +# super(BatchSystemLauncher, self).__init__( +# work_dir=work_dir, config=config +# ) +# self.batch_file = os.path.join(self.work_dir, self.batch_file_name) +# self.context = {} +# +# def parse_job_id(self, output): +# """Take the output of the submit command and return the job id.""" +# m = re.match(self.job_id_regexp, output) +# if m is not None: +# job_id = m.group() +# else: +# raise LauncherError("Job id couldn't be determined: %s" % output) +# self.job_id = job_id +# logging.info('Job started with job id: %r' % job_id) +# return job_id +# +# def write_batch_script(self, n): +# """Instantiate and write the batch script to the work_dir.""" +# self.context['n'] = n +# script_as_string = Itpl.itplns(self.batch_template, self.context) +# logging.info('Writing instantiated batch script: %s' % self.batch_file) +# f = open(self.batch_file, 'w') +# f.write(script_as_string) +# f.close() +# +# @inlineCallbacks +# def start(self, n): +# """Start n copies of the process using a batch system.""" +# self.write_batch_script(n) +# output = yield getProcessOutput(self.submit_command, +# [self.batch_file], env=os.environ) +# job_id = self.parse_job_id(output) +# self.notify_start(job_id) +# defer.returnValue(job_id) +# +# @inlineCallbacks +# def stop(self): +# output = yield getProcessOutput(self.delete_command, +# [self.job_id], env=os.environ +# ) +# self.notify_stop(output) # Pass the output of the kill cmd +# defer.returnValue(output) +# +# +# class PBSLauncher(BatchSystemLauncher): +# """A BatchSystemLauncher subclass for PBS.""" +# +# submit_command = Str('qsub', config=True) +# delete_command = Str('qdel', config=True) +# job_id_regexp = Str(r'\d+', config=True) +# batch_template = Str('', config=True) +# batch_file_name = Unicode(u'pbs_batch_script', config=True) +# batch_file = Unicode(u'') +# +# +# class PBSControllerLauncher(PBSLauncher): +# """Launch a controller using PBS.""" +# +# batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) +# +# def start(self, cluster_dir): +# """Start the controller by profile or cluster_dir.""" +# # Here we save profile and cluster_dir in the context so they +# # can be used in the batch script template as ${profile} and +# # ${cluster_dir} +# self.context['cluster_dir'] = cluster_dir +# self.cluster_dir = unicode(cluster_dir) +# logging.info("Starting PBSControllerLauncher: %r" % self.args) +# return super(PBSControllerLauncher, self).start(1) +# +# +# class PBSEngineSetLauncher(PBSLauncher): +# +# batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) +# +# def start(self, n, cluster_dir): +# """Start n engines by profile or cluster_dir.""" +# self.program_args.extend(['--cluster-dir', cluster_dir]) +# self.cluster_dir = unicode(cluster_dir) +# logging.info('Starting PBSEngineSetLauncher: %r' % self.args) +# return super(PBSEngineSetLauncher, self).start(n) + + +#----------------------------------------------------------------------------- +# A launcher for ipcluster itself! +#----------------------------------------------------------------------------- + + +class IPClusterLauncher(LocalProcessLauncher): + """Launch the ipcluster program in an external process.""" + + ipcluster_cmd = List(ipcluster_cmd_argv, config=True) + # Command line arguments to pass to ipcluster. + ipcluster_args = List( + ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True) + ipcluster_subcommand = Str('start') + ipcluster_n = Int(2) + + def find_args(self): + return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ + ['-n', repr(self.ipcluster_n)] + self.ipcluster_args + + def start(self): + logging.info("Starting ipcluster: %r" % self.args) + return super(IPClusterLauncher, self).start() + diff --git a/setup.py b/setup.py index 53f2144..2450384 100755 --- a/setup.py +++ b/setup.py @@ -218,12 +218,13 @@ if 'setuptools' in sys.modules: 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance', 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance', 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance', - 'ipclusterz = IPython.zmq.parallel.ipcluster:main', + 'ipclusterz = IPython.zmq.parallel.ipclusterapp:launch_new_instance', 'iptest = IPython.testing.iptest:main', 'irunner = IPython.lib.irunner:main' ] } setup_args['extras_require'] = dict( + zmq = 'pyzmq>=2.0.10', doc='Sphinx>=0.3', test='nose>=0.10.1', security='pyOpenSSL>=0.6'