Show More
@@ -0,0 +1,84 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | from __future__ import print_function | |||
|
3 | import sys,os | |||
|
4 | from subprocess import Popen, PIPE | |||
|
5 | ||||
|
6 | from entry_point import parse_url | |||
|
7 | from controller import make_argument_parser | |||
|
8 | ||||
|
9 | def _filter_arg(flag, args): | |||
|
10 | filtered = [] | |||
|
11 | if flag in args: | |||
|
12 | filtered.append(flag) | |||
|
13 | idx = args.index(flag) | |||
|
14 | if len(args) > idx+1: | |||
|
15 | if not args[idx+1].startswith('-'): | |||
|
16 | filtered.append(args[idx+1]) | |||
|
17 | return filtered | |||
|
18 | ||||
|
19 | def filter_args(flags, args=sys.argv[1:]): | |||
|
20 | filtered = [] | |||
|
21 | for flag in flags: | |||
|
22 | if isinstance(flag, (list,tuple)): | |||
|
23 | for f in flag: | |||
|
24 | filtered.extend(_filter_arg(f, args)) | |||
|
25 | else: | |||
|
26 | filtered.extend(_filter_arg(flag, args)) | |||
|
27 | return filtered | |||
|
28 | ||||
|
29 | def _strip_arg(flag, args): | |||
|
30 | while flag in args: | |||
|
31 | idx = args.index(flag) | |||
|
32 | args.pop(idx) | |||
|
33 | if len(args) > idx: | |||
|
34 | if not args[idx].startswith('-'): | |||
|
35 | args.pop(idx) | |||
|
36 | ||||
|
37 | def strip_args(flags, args=sys.argv[1:]): | |||
|
38 | args = list(args) | |||
|
39 | for flag in flags: | |||
|
40 | if isinstance(flag, (list,tuple)): | |||
|
41 | for f in flag: | |||
|
42 | _strip_arg(f, args) | |||
|
43 | else: | |||
|
44 | _strip_arg(flag, args) | |||
|
45 | return args | |||
|
46 | ||||
|
47 | ||||
|
48 | def launch_process(mod, args): | |||
|
49 | """Launch a controller or engine in a subprocess.""" | |||
|
50 | code = "from IPython.zmq.parallel.%s import main;main()"%mod | |||
|
51 | arguments = [ sys.executable, '-c', code ] + args | |||
|
52 | blackholew = file(os.devnull, 'w') | |||
|
53 | blackholer = file(os.devnull, 'r') | |||
|
54 | ||||
|
55 | proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew) | |||
|
56 | return proc | |||
|
57 | ||||
|
58 | def main(): | |||
|
59 | parser = make_argument_parser() | |||
|
60 | parser.add_argument('--n', '-n', type=int, default=1, | |||
|
61 | help="The number of engines to start.") | |||
|
62 | args = parser.parse_args() | |||
|
63 | parse_url(args) | |||
|
64 | ||||
|
65 | controller_args = strip_args([('--n','-n')]) | |||
|
66 | engine_args = filter_args(['--url', '--regport', '--logport', '--ip', | |||
|
67 | '--transport','--loglevel','--packer'])+['--ident'] | |||
|
68 | ||||
|
69 | controller = launch_process('controller', controller_args) | |||
|
70 | print("Launched Controller at %s"%args.url) | |||
|
71 | engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ] | |||
|
72 | print("%i Engines started"%args.n) | |||
|
73 | ||||
|
74 | def wait_quietly(p): | |||
|
75 | try: | |||
|
76 | p.wait() | |||
|
77 | except KeyboardInterrupt: | |||
|
78 | pass | |||
|
79 | wait_quietly(controller) | |||
|
80 | map(wait_quietly, engines) | |||
|
81 | print ("Done") | |||
|
82 | ||||
|
83 | if __name__ == '__main__': | |||
|
84 | main() No newline at end of file |
@@ -12,6 +12,8 b' This is the master object that handles connections from engines, clients, and' | |||||
12 | #----------------------------------------------------------------------------- |
|
12 | #----------------------------------------------------------------------------- | |
13 | # Imports |
|
13 | # Imports | |
14 | #----------------------------------------------------------------------------- |
|
14 | #----------------------------------------------------------------------------- | |
|
15 | from __future__ import print_function | |||
|
16 | ||||
15 | from datetime import datetime |
|
17 | from datetime import datetime | |
16 | import logging |
|
18 | import logging | |
17 |
|
19 | |||
@@ -24,8 +26,8 b' from IPython.zmq.log import logger # a Logger object' | |||||
24 | from IPython.zmq.entry_point import bind_port |
|
26 | from IPython.zmq.entry_point import bind_port | |
25 |
|
27 | |||
26 | from streamsession import Message, wrap_exception |
|
28 | from streamsession import Message, wrap_exception | |
27 | from entry_point import (make_argument_parser, select_random_ports, split_ports, |
|
29 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
28 | connect_logger) |
|
30 | connect_logger, parse_url) | |
29 | # from messages import json # use the same import switches |
|
31 | # from messages import json # use the same import switches | |
30 |
|
32 | |||
31 | #----------------------------------------------------------------------------- |
|
33 | #----------------------------------------------------------------------------- | |
@@ -267,7 +269,7 b' class Controller(object):' | |||||
267 | """""" |
|
269 | """""" | |
268 | logger.debug("registration::dispatch_register_request(%s)"%msg) |
|
270 | logger.debug("registration::dispatch_register_request(%s)"%msg) | |
269 | idents,msg = self.session.feed_identities(msg) |
|
271 | idents,msg = self.session.feed_identities(msg) | |
270 | print idents,msg, len(msg) |
|
272 | print (idents,msg, len(msg)) | |
271 | try: |
|
273 | try: | |
272 | msg = self.session.unpack_message(msg,content=True) |
|
274 | msg = self.session.unpack_message(msg,content=True) | |
273 | except Exception, e: |
|
275 | except Exception, e: | |
@@ -470,7 +472,7 b' class Controller(object):' | |||||
470 | logger.error("task::invalid task tracking message") |
|
472 | logger.error("task::invalid task tracking message") | |
471 | return |
|
473 | return | |
472 | content = msg['content'] |
|
474 | content = msg['content'] | |
473 | print content |
|
475 | print (content) | |
474 | msg_id = content['msg_id'] |
|
476 | msg_id = content['msg_id'] | |
475 | engine_uuid = content['engine_id'] |
|
477 | engine_uuid = content['engine_id'] | |
476 | for eid,queue_id in self.keytable.iteritems(): |
|
478 | for eid,queue_id in self.keytable.iteritems(): | |
@@ -752,20 +754,9 b' class Controller(object):' | |||||
752 | #-------------------- |
|
754 | #-------------------- | |
753 | # Entry Point |
|
755 | # Entry Point | |
754 | #-------------------- |
|
756 | #-------------------- | |
755 |
|
757 | def make_argument_parser(): | ||
756 | def main(): |
|
758 | """Make an argument parser""" | |
757 | import time |
|
759 | parser = make_base_argument_parser() | |
758 | from multiprocessing import Process |
|
|||
759 |
|
||||
760 | from zmq.eventloop.zmqstream import ZMQStream |
|
|||
761 | from zmq.devices import ProcessMonitoredQueue |
|
|||
762 | from zmq.log import handlers |
|
|||
763 |
|
||||
764 | import streamsession as session |
|
|||
765 | import heartmonitor |
|
|||
766 | from scheduler import launch_scheduler |
|
|||
767 |
|
||||
768 | parser = make_argument_parser() |
|
|||
769 |
|
760 | |||
770 | parser.add_argument('--client', type=int, metavar='PORT', default=0, |
|
761 | parser.add_argument('--client', type=int, metavar='PORT', default=0, | |
771 | help='set the XREP port for clients [default: random]') |
|
762 | help='set the XREP port for clients [default: random]') | |
@@ -787,14 +778,24 b' def main():' | |||||
787 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
778 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | |
788 | help='select the task scheduler [default: pure ZMQ]') |
|
779 | help='select the task scheduler [default: pure ZMQ]') | |
789 |
|
780 | |||
790 | args = parser.parse_args() |
|
781 | return parser | |
791 |
|
782 | |||
792 | if args.url: |
|
783 | def main(): | |
793 | args.transport,iface = args.url.split('://') |
|
784 | import time | |
794 | iface = iface.split(':') |
|
785 | from multiprocessing import Process | |
795 | args.ip = iface[0] |
|
786 | ||
796 | if iface[1]: |
|
787 | from zmq.eventloop.zmqstream import ZMQStream | |
797 | args.regport = iface[1] |
|
788 | from zmq.devices import ProcessMonitoredQueue | |
|
789 | from zmq.log import handlers | |||
|
790 | ||||
|
791 | import streamsession as session | |||
|
792 | import heartmonitor | |||
|
793 | from scheduler import launch_scheduler | |||
|
794 | ||||
|
795 | parser = make_argument_parser() | |||
|
796 | ||||
|
797 | args = parser.parse_args() | |||
|
798 | parse_url(args) | |||
798 |
|
799 | |||
799 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
800 | iface="%s://%s"%(args.transport,args.ip)+':%i' | |
800 |
|
801 | |||
@@ -891,7 +892,7 b' def main():' | |||||
891 | q.start() |
|
892 | q.start() | |
892 | else: |
|
893 | else: | |
893 | sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) |
|
894 | sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) | |
894 | print sargs |
|
895 | print (sargs) | |
895 | p = Process(target=launch_scheduler, args=sargs) |
|
896 | p = Process(target=launch_scheduler, args=sargs) | |
896 | p.daemon=True |
|
897 | p.daemon=True | |
897 | p.start() |
|
898 | p.start() | |
@@ -915,5 +916,8 b' def main():' | |||||
915 | 'notification': iface%nport |
|
916 | 'notification': iface%nport | |
916 | } |
|
917 | } | |
917 | con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs) |
|
918 | con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs) | |
|
919 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |||
918 | loop.start() |
|
920 | loop.start() | |
919 |
|
921 | |||
|
922 | if __name__ == '__main__': | |||
|
923 | main() |
@@ -3,6 +3,7 b'' | |||||
3 | it handles registration, etc. and launches a kernel |
|
3 | it handles registration, etc. and launches a kernel | |
4 | connected to the Controller's queue(s). |
|
4 | connected to the Controller's queue(s). | |
5 | """ |
|
5 | """ | |
|
6 | from __future__ import print_function | |||
6 | import sys |
|
7 | import sys | |
7 | import time |
|
8 | import time | |
8 | import traceback |
|
9 | import traceback | |
@@ -16,7 +17,7 b' from streamsession import Message, StreamSession' | |||||
16 | from client import Client |
|
17 | from client import Client | |
17 | import streamkernel as kernel |
|
18 | import streamkernel as kernel | |
18 | import heartmonitor |
|
19 | import heartmonitor | |
19 | from entry_point import make_argument_parser, connect_logger |
|
20 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
20 | # import taskthread |
|
21 | # import taskthread | |
21 | # from log import logger |
|
22 | # from log import logger | |
22 |
|
23 | |||
@@ -72,7 +73,7 b' class Engine(object):' | |||||
72 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
73 | self.control = zmqstream.ZMQStream(control, self.loop) | |
73 |
|
74 | |||
74 | task_addr = msg.content.task |
|
75 | task_addr = msg.content.task | |
75 | print task_addr |
|
76 | print (task_addr) | |
76 | if task_addr: |
|
77 | if task_addr: | |
77 | # task as stream: |
|
78 | # task as stream: | |
78 | task = self.context.socket(zmq.PAIR) |
|
79 | task = self.context.socket(zmq.PAIR) | |
@@ -103,7 +104,7 b' class Engine(object):' | |||||
103 |
|
104 | |||
104 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
105 | # logger.info("engine::completed registration with id %s"%self.session.username) | |
105 |
|
106 | |||
106 | print msg |
|
107 | print (msg) | |
107 |
|
108 | |||
108 | def unregister(self): |
|
109 | def unregister(self): | |
109 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
110 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
@@ -111,24 +112,20 b' class Engine(object):' | |||||
111 | sys.exit(0) |
|
112 | sys.exit(0) | |
112 |
|
113 | |||
113 | def start(self): |
|
114 | def start(self): | |
114 | print "registering" |
|
115 | print ("registering") | |
115 | self.register() |
|
116 | self.register() | |
116 |
|
117 | |||
117 |
|
118 | |||
118 | def main(): |
|
119 | def main(): | |
119 |
|
120 | |||
120 | parser = make_argument_parser() |
|
121 | parser = make_base_argument_parser() | |
121 |
|
122 | |||
122 | args = parser.parse_args() |
|
123 | args = parser.parse_args() | |
123 |
|
124 | |||
124 | if args.url: |
|
125 | parse_url(args) | |
125 | args.transport,iface = args.url.split('://') |
|
|||
126 | iface = iface.split(':') |
|
|||
127 | args.ip = iface[0] |
|
|||
128 | if iface[1]: |
|
|||
129 | args.regport = iface[1] |
|
|||
130 |
|
126 | |||
131 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
127 | iface="%s://%s"%(args.transport,args.ip)+':%i' | |
|
128 | ||||
132 | loop = ioloop.IOLoop.instance() |
|
129 | loop = ioloop.IOLoop.instance() | |
133 | session = StreamSession() |
|
130 | session = StreamSession() | |
134 | ctx = zmq.Context() |
|
131 | ctx = zmq.Context() | |
@@ -137,8 +134,8 b' def main():' | |||||
137 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
134 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) | |
138 |
|
135 | |||
139 | reg_conn = iface % args.regport |
|
136 | reg_conn = iface % args.regport | |
140 | print reg_conn |
|
137 | print (reg_conn) | |
141 |
print |
|
138 | print ("Starting the engine...", file=sys.__stderr__) | |
142 |
|
139 | |||
143 | reg = ctx.socket(zmq.PAIR) |
|
140 | reg = ctx.socket(zmq.PAIR) | |
144 | reg.connect(reg_conn) |
|
141 | reg.connect(reg_conn) |
@@ -39,9 +39,21 b' def select_random_ports(n):' | |||||
39 | sock.close() |
|
39 | sock.close() | |
40 | ports[i] = port |
|
40 | ports[i] = port | |
41 | return ports |
|
41 | return ports | |
|
42 | ||||
|
43 | def parse_url(args): | |||
|
44 | if args.url: | |||
|
45 | iface = args.url.split('://',1) | |||
|
46 | if len(args) == 2: | |||
|
47 | args.transport,iface = iface | |||
|
48 | iface = iface.split(':') | |||
|
49 | args.ip = iface[0] | |||
|
50 | if iface[1]: | |||
|
51 | args.regport = iface[1] | |||
|
52 | args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) | |||
|
53 | ||||
42 |
|
54 | |||
43 |
|
55 | |||
44 | def make_argument_parser(): |
|
56 | def make_base_argument_parser(): | |
45 | """ Creates an ArgumentParser for the generic arguments supported by all |
|
57 | """ Creates an ArgumentParser for the generic arguments supported by all | |
46 | ipcluster entry points. |
|
58 | ipcluster entry points. | |
47 | """ |
|
59 | """ | |
@@ -58,6 +70,9 b' def make_argument_parser():' | |||||
58 | help='set the log level [default: DEBUG]') |
|
70 | help='set the log level [default: DEBUG]') | |
59 | parser.add_argument('--ident', type=str, |
|
71 | parser.add_argument('--ident', type=str, | |
60 | help='set the ZMQ identity [default: random]') |
|
72 | help='set the ZMQ identity [default: random]') | |
|
73 | parser.add_argument('--packer', type=str, default='json', | |||
|
74 | choices=['json','pickle'], | |||
|
75 | help='set the message format method [default: json]') | |||
61 | parser.add_argument('--url', type=str, |
|
76 | parser.add_argument('--url', type=str, | |
62 | help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') |
|
77 | help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') | |
63 |
|
78 |
@@ -217,6 +217,7 b" if 'setuptools' in sys.modules:" | |||||
217 | 'pycolor = IPython.utils.PyColorize:main', |
|
217 | 'pycolor = IPython.utils.PyColorize:main', | |
218 | 'ipcontrollerz = IPython.zmq.parallel.controller:main', |
|
218 | 'ipcontrollerz = IPython.zmq.parallel.controller:main', | |
219 | 'ipenginez = IPython.zmq.parallel.engine:main', |
|
219 | 'ipenginez = IPython.zmq.parallel.engine:main', | |
|
220 | 'ipclusterz = IPython.zmq.parallel.ipcluster:main', | |||
220 | 'iptest = IPython.testing.iptest:main', |
|
221 | 'iptest = IPython.testing.iptest:main', | |
221 | 'irunner = IPython.lib.irunner:main' |
|
222 | 'irunner = IPython.lib.irunner:main' | |
222 | ] |
|
223 | ] |
General Comments 0
You need to be logged in to leave comments.
Login now