Show More
@@ -0,0 +1,84 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | from __future__ import print_function | |
|
3 | import sys,os | |
|
4 | from subprocess import Popen, PIPE | |
|
5 | ||
|
6 | from entry_point import parse_url | |
|
7 | from controller import make_argument_parser | |
|
8 | ||
|
9 | def _filter_arg(flag, args): | |
|
10 | filtered = [] | |
|
11 | if flag in args: | |
|
12 | filtered.append(flag) | |
|
13 | idx = args.index(flag) | |
|
14 | if len(args) > idx+1: | |
|
15 | if not args[idx+1].startswith('-'): | |
|
16 | filtered.append(args[idx+1]) | |
|
17 | return filtered | |
|
18 | ||
|
19 | def filter_args(flags, args=sys.argv[1:]): | |
|
20 | filtered = [] | |
|
21 | for flag in flags: | |
|
22 | if isinstance(flag, (list,tuple)): | |
|
23 | for f in flag: | |
|
24 | filtered.extend(_filter_arg(f, args)) | |
|
25 | else: | |
|
26 | filtered.extend(_filter_arg(flag, args)) | |
|
27 | return filtered | |
|
28 | ||
|
29 | def _strip_arg(flag, args): | |
|
30 | while flag in args: | |
|
31 | idx = args.index(flag) | |
|
32 | args.pop(idx) | |
|
33 | if len(args) > idx: | |
|
34 | if not args[idx].startswith('-'): | |
|
35 | args.pop(idx) | |
|
36 | ||
|
37 | def strip_args(flags, args=sys.argv[1:]): | |
|
38 | args = list(args) | |
|
39 | for flag in flags: | |
|
40 | if isinstance(flag, (list,tuple)): | |
|
41 | for f in flag: | |
|
42 | _strip_arg(f, args) | |
|
43 | else: | |
|
44 | _strip_arg(flag, args) | |
|
45 | return args | |
|
46 | ||
|
47 | ||
|
48 | def launch_process(mod, args): | |
|
49 | """Launch a controller or engine in a subprocess.""" | |
|
50 | code = "from IPython.zmq.parallel.%s import main;main()"%mod | |
|
51 | arguments = [ sys.executable, '-c', code ] + args | |
|
52 | blackholew = file(os.devnull, 'w') | |
|
53 | blackholer = file(os.devnull, 'r') | |
|
54 | ||
|
55 | proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew) | |
|
56 | return proc | |
|
57 | ||
|
58 | def main(): | |
|
59 | parser = make_argument_parser() | |
|
60 | parser.add_argument('--n', '-n', type=int, default=1, | |
|
61 | help="The number of engines to start.") | |
|
62 | args = parser.parse_args() | |
|
63 | parse_url(args) | |
|
64 | ||
|
65 | controller_args = strip_args([('--n','-n')]) | |
|
66 | engine_args = filter_args(['--url', '--regport', '--logport', '--ip', | |
|
67 | '--transport','--loglevel','--packer'])+['--ident'] | |
|
68 | ||
|
69 | controller = launch_process('controller', controller_args) | |
|
70 | print("Launched Controller at %s"%args.url) | |
|
71 | engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ] | |
|
72 | print("%i Engines started"%args.n) | |
|
73 | ||
|
74 | def wait_quietly(p): | |
|
75 | try: | |
|
76 | p.wait() | |
|
77 | except KeyboardInterrupt: | |
|
78 | pass | |
|
79 | wait_quietly(controller) | |
|
80 | map(wait_quietly, engines) | |
|
81 | print ("Done") | |
|
82 | ||
|
83 | if __name__ == '__main__': | |
|
84 | main() No newline at end of file |
@@ -12,6 +12,8 b' This is the master object that handles connections from engines, clients, and' | |||
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | # Imports |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | from __future__ import print_function | |
|
16 | ||
|
15 | 17 | from datetime import datetime |
|
16 | 18 | import logging |
|
17 | 19 | |
@@ -24,8 +26,8 b' from IPython.zmq.log import logger # a Logger object' | |||
|
24 | 26 | from IPython.zmq.entry_point import bind_port |
|
25 | 27 | |
|
26 | 28 | from streamsession import Message, wrap_exception |
|
27 | from entry_point import (make_argument_parser, select_random_ports, split_ports, | |
|
28 | connect_logger) | |
|
29 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
|
30 | connect_logger, parse_url) | |
|
29 | 31 | # from messages import json # use the same import switches |
|
30 | 32 | |
|
31 | 33 | #----------------------------------------------------------------------------- |
@@ -267,7 +269,7 b' class Controller(object):' | |||
|
267 | 269 | """""" |
|
268 | 270 | logger.debug("registration::dispatch_register_request(%s)"%msg) |
|
269 | 271 | idents,msg = self.session.feed_identities(msg) |
|
270 | print idents,msg, len(msg) | |
|
272 | print (idents,msg, len(msg)) | |
|
271 | 273 | try: |
|
272 | 274 | msg = self.session.unpack_message(msg,content=True) |
|
273 | 275 | except Exception, e: |
@@ -470,7 +472,7 b' class Controller(object):' | |||
|
470 | 472 | logger.error("task::invalid task tracking message") |
|
471 | 473 | return |
|
472 | 474 | content = msg['content'] |
|
473 | print content | |
|
475 | print (content) | |
|
474 | 476 | msg_id = content['msg_id'] |
|
475 | 477 | engine_uuid = content['engine_id'] |
|
476 | 478 | for eid,queue_id in self.keytable.iteritems(): |
@@ -752,20 +754,9 b' class Controller(object):' | |||
|
752 | 754 | #-------------------- |
|
753 | 755 | # Entry Point |
|
754 | 756 | #-------------------- |
|
755 | ||
|
756 | def main(): | |
|
757 | import time | |
|
758 | from multiprocessing import Process | |
|
759 | ||
|
760 | from zmq.eventloop.zmqstream import ZMQStream | |
|
761 | from zmq.devices import ProcessMonitoredQueue | |
|
762 | from zmq.log import handlers | |
|
763 | ||
|
764 | import streamsession as session | |
|
765 | import heartmonitor | |
|
766 | from scheduler import launch_scheduler | |
|
767 | ||
|
768 | parser = make_argument_parser() | |
|
757 | def make_argument_parser(): | |
|
758 | """Make an argument parser""" | |
|
759 | parser = make_base_argument_parser() | |
|
769 | 760 | |
|
770 | 761 | parser.add_argument('--client', type=int, metavar='PORT', default=0, |
|
771 | 762 | help='set the XREP port for clients [default: random]') |
@@ -787,14 +778,24 b' def main():' | |||
|
787 | 778 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
788 | 779 | help='select the task scheduler [default: pure ZMQ]') |
|
789 | 780 | |
|
790 | args = parser.parse_args() | |
|
781 | return parser | |
|
791 | 782 | |
|
792 | if args.url: | |
|
793 | args.transport,iface = args.url.split('://') | |
|
794 | iface = iface.split(':') | |
|
795 | args.ip = iface[0] | |
|
796 | if iface[1]: | |
|
797 | args.regport = iface[1] | |
|
783 | def main(): | |
|
784 | import time | |
|
785 | from multiprocessing import Process | |
|
786 | ||
|
787 | from zmq.eventloop.zmqstream import ZMQStream | |
|
788 | from zmq.devices import ProcessMonitoredQueue | |
|
789 | from zmq.log import handlers | |
|
790 | ||
|
791 | import streamsession as session | |
|
792 | import heartmonitor | |
|
793 | from scheduler import launch_scheduler | |
|
794 | ||
|
795 | parser = make_argument_parser() | |
|
796 | ||
|
797 | args = parser.parse_args() | |
|
798 | parse_url(args) | |
|
798 | 799 | |
|
799 | 800 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
800 | 801 | |
@@ -891,7 +892,7 b' def main():' | |||
|
891 | 892 | q.start() |
|
892 | 893 | else: |
|
893 | 894 | sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) |
|
894 | print sargs | |
|
895 | print (sargs) | |
|
895 | 896 | p = Process(target=launch_scheduler, args=sargs) |
|
896 | 897 | p.daemon=True |
|
897 | 898 | p.start() |
@@ -915,5 +916,8 b' def main():' | |||
|
915 | 916 | 'notification': iface%nport |
|
916 | 917 | } |
|
917 | 918 | con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs) |
|
919 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
|
918 | 920 | loop.start() |
|
919 | ||
|
921 | ||
|
922 | if __name__ == '__main__': | |
|
923 | main() |
@@ -3,6 +3,7 b'' | |||
|
3 | 3 | it handles registration, etc. and launches a kernel |
|
4 | 4 | connected to the Controller's queue(s). |
|
5 | 5 | """ |
|
6 | from __future__ import print_function | |
|
6 | 7 | import sys |
|
7 | 8 | import time |
|
8 | 9 | import traceback |
@@ -16,7 +17,7 b' from streamsession import Message, StreamSession' | |||
|
16 | 17 | from client import Client |
|
17 | 18 | import streamkernel as kernel |
|
18 | 19 | import heartmonitor |
|
19 | from entry_point import make_argument_parser, connect_logger | |
|
20 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
|
20 | 21 | # import taskthread |
|
21 | 22 | # from log import logger |
|
22 | 23 | |
@@ -72,7 +73,7 b' class Engine(object):' | |||
|
72 | 73 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
73 | 74 | |
|
74 | 75 | task_addr = msg.content.task |
|
75 | print task_addr | |
|
76 | print (task_addr) | |
|
76 | 77 | if task_addr: |
|
77 | 78 | # task as stream: |
|
78 | 79 | task = self.context.socket(zmq.PAIR) |
@@ -103,7 +104,7 b' class Engine(object):' | |||
|
103 | 104 | |
|
104 | 105 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
105 | 106 | |
|
106 | print msg | |
|
107 | print (msg) | |
|
107 | 108 | |
|
108 | 109 | def unregister(self): |
|
109 | 110 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
@@ -111,24 +112,20 b' class Engine(object):' | |||
|
111 | 112 | sys.exit(0) |
|
112 | 113 | |
|
113 | 114 | def start(self): |
|
114 | print "registering" | |
|
115 | print ("registering") | |
|
115 | 116 | self.register() |
|
116 | 117 | |
|
117 | 118 | |
|
118 | 119 | def main(): |
|
119 | 120 | |
|
120 | parser = make_argument_parser() | |
|
121 | parser = make_base_argument_parser() | |
|
121 | 122 | |
|
122 | 123 | args = parser.parse_args() |
|
123 | 124 | |
|
124 | if args.url: | |
|
125 | args.transport,iface = args.url.split('://') | |
|
126 | iface = iface.split(':') | |
|
127 | args.ip = iface[0] | |
|
128 | if iface[1]: | |
|
129 | args.regport = iface[1] | |
|
125 | parse_url(args) | |
|
130 | 126 | |
|
131 | 127 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
128 | ||
|
132 | 129 | loop = ioloop.IOLoop.instance() |
|
133 | 130 | session = StreamSession() |
|
134 | 131 | ctx = zmq.Context() |
@@ -137,8 +134,8 b' def main():' | |||
|
137 | 134 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
138 | 135 | |
|
139 | 136 | reg_conn = iface % args.regport |
|
140 | print reg_conn | |
|
141 |
print |
|
|
137 | print (reg_conn) | |
|
138 | print ("Starting the engine...", file=sys.__stderr__) | |
|
142 | 139 | |
|
143 | 140 | reg = ctx.socket(zmq.PAIR) |
|
144 | 141 | reg.connect(reg_conn) |
@@ -39,9 +39,21 b' def select_random_ports(n):' | |||
|
39 | 39 | sock.close() |
|
40 | 40 | ports[i] = port |
|
41 | 41 | return ports |
|
42 | ||
|
43 | def parse_url(args): | |
|
44 | if args.url: | |
|
45 | iface = args.url.split('://',1) | |
|
46 | if len(args) == 2: | |
|
47 | args.transport,iface = iface | |
|
48 | iface = iface.split(':') | |
|
49 | args.ip = iface[0] | |
|
50 | if iface[1]: | |
|
51 | args.regport = iface[1] | |
|
52 | args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) | |
|
53 | ||
|
42 | 54 | |
|
43 | 55 | |
|
44 | def make_argument_parser(): | |
|
56 | def make_base_argument_parser(): | |
|
45 | 57 | """ Creates an ArgumentParser for the generic arguments supported by all |
|
46 | 58 | ipcluster entry points. |
|
47 | 59 | """ |
@@ -58,6 +70,9 b' def make_argument_parser():' | |||
|
58 | 70 | help='set the log level [default: DEBUG]') |
|
59 | 71 | parser.add_argument('--ident', type=str, |
|
60 | 72 | help='set the ZMQ identity [default: random]') |
|
73 | parser.add_argument('--packer', type=str, default='json', | |
|
74 | choices=['json','pickle'], | |
|
75 | help='set the message format method [default: json]') | |
|
61 | 76 | parser.add_argument('--url', type=str, |
|
62 | 77 | help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') |
|
63 | 78 |
@@ -217,6 +217,7 b" if 'setuptools' in sys.modules:" | |||
|
217 | 217 | 'pycolor = IPython.utils.PyColorize:main', |
|
218 | 218 | 'ipcontrollerz = IPython.zmq.parallel.controller:main', |
|
219 | 219 | 'ipenginez = IPython.zmq.parallel.engine:main', |
|
220 | 'ipclusterz = IPython.zmq.parallel.ipcluster:main', | |
|
220 | 221 | 'iptest = IPython.testing.iptest:main', |
|
221 | 222 | 'irunner = IPython.lib.irunner:main' |
|
222 | 223 | ] |
General Comments 0
You need to be logged in to leave comments.
Login now