##// END OF EJS Templates
added simple cluster entry point
MinRK -
Show More
@@ -0,0 +1,84 b''
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys,os
4 from subprocess import Popen, PIPE
5
6 from entry_point import parse_url
7 from controller import make_argument_parser
8
9 def _filter_arg(flag, args):
10 filtered = []
11 if flag in args:
12 filtered.append(flag)
13 idx = args.index(flag)
14 if len(args) > idx+1:
15 if not args[idx+1].startswith('-'):
16 filtered.append(args[idx+1])
17 return filtered
18
19 def filter_args(flags, args=sys.argv[1:]):
20 filtered = []
21 for flag in flags:
22 if isinstance(flag, (list,tuple)):
23 for f in flag:
24 filtered.extend(_filter_arg(f, args))
25 else:
26 filtered.extend(_filter_arg(flag, args))
27 return filtered
28
29 def _strip_arg(flag, args):
30 while flag in args:
31 idx = args.index(flag)
32 args.pop(idx)
33 if len(args) > idx:
34 if not args[idx].startswith('-'):
35 args.pop(idx)
36
37 def strip_args(flags, args=sys.argv[1:]):
38 args = list(args)
39 for flag in flags:
40 if isinstance(flag, (list,tuple)):
41 for f in flag:
42 _strip_arg(f, args)
43 else:
44 _strip_arg(flag, args)
45 return args
46
47
48 def launch_process(mod, args):
49 """Launch a controller or engine in a subprocess."""
50 code = "from IPython.zmq.parallel.%s import main;main()"%mod
51 arguments = [ sys.executable, '-c', code ] + args
52 blackholew = file(os.devnull, 'w')
53 blackholer = file(os.devnull, 'r')
54
55 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew)
56 return proc
57
58 def main():
59 parser = make_argument_parser()
60 parser.add_argument('--n', '-n', type=int, default=1,
61 help="The number of engines to start.")
62 args = parser.parse_args()
63 parse_url(args)
64
65 controller_args = strip_args([('--n','-n')])
66 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
67 '--transport','--loglevel','--packer'])+['--ident']
68
69 controller = launch_process('controller', controller_args)
70 print("Launched Controller at %s"%args.url)
71 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
72 print("%i Engines started"%args.n)
73
74 def wait_quietly(p):
75 try:
76 p.wait()
77 except KeyboardInterrupt:
78 pass
79 wait_quietly(controller)
80 map(wait_quietly, engines)
81 print ("Done")
82
83 if __name__ == '__main__':
84 main() No newline at end of file
@@ -12,6 +12,8 b' This is the master object that handles connections from engines, clients, and'
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
15 17 from datetime import datetime
16 18 import logging
17 19
@@ -24,8 +26,8 b' from IPython.zmq.log import logger # a Logger object'
24 26 from IPython.zmq.entry_point import bind_port
25 27
26 28 from streamsession import Message, wrap_exception
27 from entry_point import (make_argument_parser, select_random_ports, split_ports,
28 connect_logger)
29 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 connect_logger, parse_url)
29 31 # from messages import json # use the same import switches
30 32
31 33 #-----------------------------------------------------------------------------
@@ -267,7 +269,7 b' class Controller(object):'
267 269 """"""
268 270 logger.debug("registration::dispatch_register_request(%s)"%msg)
269 271 idents,msg = self.session.feed_identities(msg)
270 print idents,msg, len(msg)
272 print (idents,msg, len(msg))
271 273 try:
272 274 msg = self.session.unpack_message(msg,content=True)
273 275 except Exception, e:
@@ -470,7 +472,7 b' class Controller(object):'
470 472 logger.error("task::invalid task tracking message")
471 473 return
472 474 content = msg['content']
473 print content
475 print (content)
474 476 msg_id = content['msg_id']
475 477 engine_uuid = content['engine_id']
476 478 for eid,queue_id in self.keytable.iteritems():
@@ -752,20 +754,9 b' class Controller(object):'
752 754 #--------------------
753 755 # Entry Point
754 756 #--------------------
755
756 def main():
757 import time
758 from multiprocessing import Process
759
760 from zmq.eventloop.zmqstream import ZMQStream
761 from zmq.devices import ProcessMonitoredQueue
762 from zmq.log import handlers
763
764 import streamsession as session
765 import heartmonitor
766 from scheduler import launch_scheduler
767
768 parser = make_argument_parser()
757 def make_argument_parser():
758 """Make an argument parser"""
759 parser = make_base_argument_parser()
769 760
770 761 parser.add_argument('--client', type=int, metavar='PORT', default=0,
771 762 help='set the XREP port for clients [default: random]')
@@ -787,14 +778,24 b' def main():'
787 778 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
788 779 help='select the task scheduler [default: pure ZMQ]')
789 780
790 args = parser.parse_args()
781 return parser
791 782
792 if args.url:
793 args.transport,iface = args.url.split('://')
794 iface = iface.split(':')
795 args.ip = iface[0]
796 if iface[1]:
797 args.regport = iface[1]
783 def main():
784 import time
785 from multiprocessing import Process
786
787 from zmq.eventloop.zmqstream import ZMQStream
788 from zmq.devices import ProcessMonitoredQueue
789 from zmq.log import handlers
790
791 import streamsession as session
792 import heartmonitor
793 from scheduler import launch_scheduler
794
795 parser = make_argument_parser()
796
797 args = parser.parse_args()
798 parse_url(args)
798 799
799 800 iface="%s://%s"%(args.transport,args.ip)+':%i'
800 801
@@ -891,7 +892,7 b' def main():'
891 892 q.start()
892 893 else:
893 894 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 print sargs
895 print (sargs)
895 896 p = Process(target=launch_scheduler, args=sargs)
896 897 p.daemon=True
897 898 p.start()
@@ -915,5 +916,8 b' def main():'
915 916 'notification': iface%nport
916 917 }
917 918 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
919 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
918 920 loop.start()
919
921
922 if __name__ == '__main__':
923 main()
@@ -3,6 +3,7 b''
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 from __future__ import print_function
6 7 import sys
7 8 import time
8 9 import traceback
@@ -16,7 +17,7 b' from streamsession import Message, StreamSession'
16 17 from client import Client
17 18 import streamkernel as kernel
18 19 import heartmonitor
19 from entry_point import make_argument_parser, connect_logger
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
20 21 # import taskthread
21 22 # from log import logger
22 23
@@ -72,7 +73,7 b' class Engine(object):'
72 73 self.control = zmqstream.ZMQStream(control, self.loop)
73 74
74 75 task_addr = msg.content.task
75 print task_addr
76 print (task_addr)
76 77 if task_addr:
77 78 # task as stream:
78 79 task = self.context.socket(zmq.PAIR)
@@ -103,7 +104,7 b' class Engine(object):'
103 104
104 105 # logger.info("engine::completed registration with id %s"%self.session.username)
105 106
106 print msg
107 print (msg)
107 108
108 109 def unregister(self):
109 110 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -111,24 +112,20 b' class Engine(object):'
111 112 sys.exit(0)
112 113
113 114 def start(self):
114 print "registering"
115 print ("registering")
115 116 self.register()
116 117
117 118
118 119 def main():
119 120
120 parser = make_argument_parser()
121 parser = make_base_argument_parser()
121 122
122 123 args = parser.parse_args()
123 124
124 if args.url:
125 args.transport,iface = args.url.split('://')
126 iface = iface.split(':')
127 args.ip = iface[0]
128 if iface[1]:
129 args.regport = iface[1]
125 parse_url(args)
130 126
131 127 iface="%s://%s"%(args.transport,args.ip)+':%i'
128
132 129 loop = ioloop.IOLoop.instance()
133 130 session = StreamSession()
134 131 ctx = zmq.Context()
@@ -137,8 +134,8 b' def main():'
137 134 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
138 135
139 136 reg_conn = iface % args.regport
140 print reg_conn
141 print >>sys.__stdout__, "Starting the engine..."
137 print (reg_conn)
138 print ("Starting the engine...", file=sys.__stderr__)
142 139
143 140 reg = ctx.socket(zmq.PAIR)
144 141 reg.connect(reg_conn)
@@ -39,9 +39,21 b' def select_random_ports(n):'
39 39 sock.close()
40 40 ports[i] = port
41 41 return ports
42
43 def parse_url(args):
44 if args.url:
45 iface = args.url.split('://',1)
46 if len(args) == 2:
47 args.transport,iface = iface
48 iface = iface.split(':')
49 args.ip = iface[0]
50 if iface[1]:
51 args.regport = iface[1]
52 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
53
42 54
43 55
44 def make_argument_parser():
56 def make_base_argument_parser():
45 57 """ Creates an ArgumentParser for the generic arguments supported by all
46 58 ipcluster entry points.
47 59 """
@@ -58,6 +70,9 b' def make_argument_parser():'
58 70 help='set the log level [default: DEBUG]')
59 71 parser.add_argument('--ident', type=str,
60 72 help='set the ZMQ identity [default: random]')
73 parser.add_argument('--packer', type=str, default='json',
74 choices=['json','pickle'],
75 help='set the message format method [default: json]')
61 76 parser.add_argument('--url', type=str,
62 77 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
63 78
@@ -217,6 +217,7 b" if 'setuptools' in sys.modules:"
217 217 'pycolor = IPython.utils.PyColorize:main',
218 218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 219 'ipenginez = IPython.zmq.parallel.engine:main',
220 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
220 221 'iptest = IPython.testing.iptest:main',
221 222 'irunner = IPython.lib.irunner:main'
222 223 ]
General Comments 0
You need to be logged in to leave comments. Login now