diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 8238bc9..1f0507f 100644 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -12,6 +12,8 @@ This is the master object that handles connections from engines, clients, and #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- +from __future__ import print_function + from datetime import datetime import logging @@ -24,8 +26,8 @@ from IPython.zmq.log import logger # a Logger object from IPython.zmq.entry_point import bind_port from streamsession import Message, wrap_exception -from entry_point import (make_argument_parser, select_random_ports, split_ports, - connect_logger) +from entry_point import (make_base_argument_parser, select_random_ports, split_ports, + connect_logger, parse_url) # from messages import json # use the same import switches #----------------------------------------------------------------------------- @@ -267,7 +269,7 @@ class Controller(object): """""" logger.debug("registration::dispatch_register_request(%s)"%msg) idents,msg = self.session.feed_identities(msg) - print idents,msg, len(msg) + print (idents,msg, len(msg)) try: msg = self.session.unpack_message(msg,content=True) except Exception, e: @@ -470,7 +472,7 @@ class Controller(object): logger.error("task::invalid task tracking message") return content = msg['content'] - print content + print (content) msg_id = content['msg_id'] engine_uuid = content['engine_id'] for eid,queue_id in self.keytable.iteritems(): @@ -752,20 +754,9 @@ class Controller(object): #-------------------- # Entry Point #-------------------- - -def main(): - import time - from multiprocessing import Process - - from zmq.eventloop.zmqstream import ZMQStream - from zmq.devices import ProcessMonitoredQueue - from zmq.log import handlers - - import streamsession as session - import heartmonitor - from scheduler import launch_scheduler - - parser = make_argument_parser() +def make_argument_parser(): + """Make an argument parser""" + parser = make_base_argument_parser() parser.add_argument('--client', type=int, metavar='PORT', default=0, help='set the XREP port for clients [default: random]') @@ -787,14 +778,24 @@ def main(): choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], help='select the task scheduler [default: pure ZMQ]') - args = parser.parse_args() + return parser - if args.url: - args.transport,iface = args.url.split('://') - iface = iface.split(':') - args.ip = iface[0] - if iface[1]: - args.regport = iface[1] +def main(): + import time + from multiprocessing import Process + + from zmq.eventloop.zmqstream import ZMQStream + from zmq.devices import ProcessMonitoredQueue + from zmq.log import handlers + + import streamsession as session + import heartmonitor + from scheduler import launch_scheduler + + parser = make_argument_parser() + + args = parser.parse_args() + parse_url(args) iface="%s://%s"%(args.transport,args.ip)+':%i' @@ -891,7 +892,7 @@ def main(): q.start() else: sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) - print sargs + print (sargs) p = Process(target=launch_scheduler, args=sargs) p.daemon=True p.start() @@ -915,5 +916,8 @@ def main(): 'notification': iface%nport } con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs) + dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) loop.start() - + +if __name__ == '__main__': + main() diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index 10ccc77..92af2a5 100644 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -3,6 +3,7 @@ it handles registration, etc. and launches a kernel connected to the Controller's queue(s). """ +from __future__ import print_function import sys import time import traceback @@ -16,7 +17,7 @@ from streamsession import Message, StreamSession from client import Client import streamkernel as kernel import heartmonitor -from entry_point import make_argument_parser, connect_logger +from entry_point import make_base_argument_parser, connect_logger, parse_url # import taskthread # from log import logger @@ -72,7 +73,7 @@ class Engine(object): self.control = zmqstream.ZMQStream(control, self.loop) task_addr = msg.content.task - print task_addr + print (task_addr) if task_addr: # task as stream: task = self.context.socket(zmq.PAIR) @@ -103,7 +104,7 @@ class Engine(object): # logger.info("engine::completed registration with id %s"%self.session.username) - print msg + print (msg) def unregister(self): self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) @@ -111,24 +112,20 @@ class Engine(object): sys.exit(0) def start(self): - print "registering" + print ("registering") self.register() def main(): - parser = make_argument_parser() + parser = make_base_argument_parser() args = parser.parse_args() - if args.url: - args.transport,iface = args.url.split('://') - iface = iface.split(':') - args.ip = iface[0] - if iface[1]: - args.regport = iface[1] + parse_url(args) iface="%s://%s"%(args.transport,args.ip)+':%i' + loop = ioloop.IOLoop.instance() session = StreamSession() ctx = zmq.Context() @@ -137,8 +134,8 @@ def main(): connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) reg_conn = iface % args.regport - print reg_conn - print >>sys.__stdout__, "Starting the engine..." + print (reg_conn) + print ("Starting the engine...", file=sys.__stderr__) reg = ctx.socket(zmq.PAIR) reg.connect(reg_conn) diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index 001fe25..8610de5 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -39,9 +39,21 @@ def select_random_ports(n): sock.close() ports[i] = port return ports + +def parse_url(args): + if args.url: + iface = args.url.split('://',1) + if len(args) == 2: + args.transport,iface = iface + iface = iface.split(':') + args.ip = iface[0] + if iface[1]: + args.regport = iface[1] + args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) + -def make_argument_parser(): +def make_base_argument_parser(): """ Creates an ArgumentParser for the generic arguments supported by all ipcluster entry points. """ @@ -58,6 +70,9 @@ def make_argument_parser(): help='set the log level [default: DEBUG]') parser.add_argument('--ident', type=str, help='set the ZMQ identity [default: random]') + parser.add_argument('--packer', type=str, default='json', + choices=['json','pickle'], + help='set the message format method [default: json]') parser.add_argument('--url', type=str, help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') diff --git a/IPython/zmq/parallel/ipcluster.py b/IPython/zmq/parallel/ipcluster.py new file mode 100644 index 0000000..150cba3 --- /dev/null +++ b/IPython/zmq/parallel/ipcluster.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +from __future__ import print_function +import sys,os +from subprocess import Popen, PIPE + +from entry_point import parse_url +from controller import make_argument_parser + +def _filter_arg(flag, args): + filtered = [] + if flag in args: + filtered.append(flag) + idx = args.index(flag) + if len(args) > idx+1: + if not args[idx+1].startswith('-'): + filtered.append(args[idx+1]) + return filtered + +def filter_args(flags, args=sys.argv[1:]): + filtered = [] + for flag in flags: + if isinstance(flag, (list,tuple)): + for f in flag: + filtered.extend(_filter_arg(f, args)) + else: + filtered.extend(_filter_arg(flag, args)) + return filtered + +def _strip_arg(flag, args): + while flag in args: + idx = args.index(flag) + args.pop(idx) + if len(args) > idx: + if not args[idx].startswith('-'): + args.pop(idx) + +def strip_args(flags, args=sys.argv[1:]): + args = list(args) + for flag in flags: + if isinstance(flag, (list,tuple)): + for f in flag: + _strip_arg(f, args) + else: + _strip_arg(flag, args) + return args + + +def launch_process(mod, args): + """Launch a controller or engine in a subprocess.""" + code = "from IPython.zmq.parallel.%s import main;main()"%mod + arguments = [ sys.executable, '-c', code ] + args + blackholew = file(os.devnull, 'w') + blackholer = file(os.devnull, 'r') + + proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew) + return proc + +def main(): + parser = make_argument_parser() + parser.add_argument('--n', '-n', type=int, default=1, + help="The number of engines to start.") + args = parser.parse_args() + parse_url(args) + + controller_args = strip_args([('--n','-n')]) + engine_args = filter_args(['--url', '--regport', '--logport', '--ip', + '--transport','--loglevel','--packer'])+['--ident'] + + controller = launch_process('controller', controller_args) + print("Launched Controller at %s"%args.url) + engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ] + print("%i Engines started"%args.n) + + def wait_quietly(p): + try: + p.wait() + except KeyboardInterrupt: + pass + wait_quietly(controller) + map(wait_quietly, engines) + print ("Done") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/setup.py b/setup.py index fe18c0b..2d83b9a 100755 --- a/setup.py +++ b/setup.py @@ -217,6 +217,7 @@ if 'setuptools' in sys.modules: 'pycolor = IPython.utils.PyColorize:main', 'ipcontrollerz = IPython.zmq.parallel.controller:main', 'ipenginez = IPython.zmq.parallel.engine:main', + 'ipclusterz = IPython.zmq.parallel.ipcluster:main', 'iptest = IPython.testing.iptest:main', 'irunner = IPython.lib.irunner:main' ]