##// END OF EJS Templates
added simple cluster entry point
MinRK -
Show More
@@ -0,0 +1,84 b''
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys,os
4 from subprocess import Popen, PIPE
5
6 from entry_point import parse_url
7 from controller import make_argument_parser
8
9 def _filter_arg(flag, args):
10 filtered = []
11 if flag in args:
12 filtered.append(flag)
13 idx = args.index(flag)
14 if len(args) > idx+1:
15 if not args[idx+1].startswith('-'):
16 filtered.append(args[idx+1])
17 return filtered
18
19 def filter_args(flags, args=sys.argv[1:]):
20 filtered = []
21 for flag in flags:
22 if isinstance(flag, (list,tuple)):
23 for f in flag:
24 filtered.extend(_filter_arg(f, args))
25 else:
26 filtered.extend(_filter_arg(flag, args))
27 return filtered
28
29 def _strip_arg(flag, args):
30 while flag in args:
31 idx = args.index(flag)
32 args.pop(idx)
33 if len(args) > idx:
34 if not args[idx].startswith('-'):
35 args.pop(idx)
36
37 def strip_args(flags, args=sys.argv[1:]):
38 args = list(args)
39 for flag in flags:
40 if isinstance(flag, (list,tuple)):
41 for f in flag:
42 _strip_arg(f, args)
43 else:
44 _strip_arg(flag, args)
45 return args
46
47
48 def launch_process(mod, args):
49 """Launch a controller or engine in a subprocess."""
50 code = "from IPython.zmq.parallel.%s import main;main()"%mod
51 arguments = [ sys.executable, '-c', code ] + args
52 blackholew = file(os.devnull, 'w')
53 blackholer = file(os.devnull, 'r')
54
55 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew)
56 return proc
57
58 def main():
59 parser = make_argument_parser()
60 parser.add_argument('--n', '-n', type=int, default=1,
61 help="The number of engines to start.")
62 args = parser.parse_args()
63 parse_url(args)
64
65 controller_args = strip_args([('--n','-n')])
66 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
67 '--transport','--loglevel','--packer'])+['--ident']
68
69 controller = launch_process('controller', controller_args)
70 print("Launched Controller at %s"%args.url)
71 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
72 print("%i Engines started"%args.n)
73
74 def wait_quietly(p):
75 try:
76 p.wait()
77 except KeyboardInterrupt:
78 pass
79 wait_quietly(controller)
80 map(wait_quietly, engines)
81 print ("Done")
82
83 if __name__ == '__main__':
84 main() No newline at end of file
@@ -12,6 +12,8 b' This is the master object that handles connections from engines, clients, and'
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
15 from datetime import datetime
17 from datetime import datetime
16 import logging
18 import logging
17
19
@@ -24,8 +26,8 b' from IPython.zmq.log import logger # a Logger object'
24 from IPython.zmq.entry_point import bind_port
26 from IPython.zmq.entry_point import bind_port
25
27
26 from streamsession import Message, wrap_exception
28 from streamsession import Message, wrap_exception
27 from entry_point import (make_argument_parser, select_random_ports, split_ports,
29 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
28 connect_logger)
30 connect_logger, parse_url)
29 # from messages import json # use the same import switches
31 # from messages import json # use the same import switches
30
32
31 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
@@ -267,7 +269,7 b' class Controller(object):'
267 """"""
269 """"""
268 logger.debug("registration::dispatch_register_request(%s)"%msg)
270 logger.debug("registration::dispatch_register_request(%s)"%msg)
269 idents,msg = self.session.feed_identities(msg)
271 idents,msg = self.session.feed_identities(msg)
270 print idents,msg, len(msg)
272 print (idents,msg, len(msg))
271 try:
273 try:
272 msg = self.session.unpack_message(msg,content=True)
274 msg = self.session.unpack_message(msg,content=True)
273 except Exception, e:
275 except Exception, e:
@@ -470,7 +472,7 b' class Controller(object):'
470 logger.error("task::invalid task tracking message")
472 logger.error("task::invalid task tracking message")
471 return
473 return
472 content = msg['content']
474 content = msg['content']
473 print content
475 print (content)
474 msg_id = content['msg_id']
476 msg_id = content['msg_id']
475 engine_uuid = content['engine_id']
477 engine_uuid = content['engine_id']
476 for eid,queue_id in self.keytable.iteritems():
478 for eid,queue_id in self.keytable.iteritems():
@@ -752,20 +754,9 b' class Controller(object):'
752 #--------------------
754 #--------------------
753 # Entry Point
755 # Entry Point
754 #--------------------
756 #--------------------
755
757 def make_argument_parser():
756 def main():
758 """Make an argument parser"""
757 import time
759 parser = make_base_argument_parser()
758 from multiprocessing import Process
759
760 from zmq.eventloop.zmqstream import ZMQStream
761 from zmq.devices import ProcessMonitoredQueue
762 from zmq.log import handlers
763
764 import streamsession as session
765 import heartmonitor
766 from scheduler import launch_scheduler
767
768 parser = make_argument_parser()
769
760
770 parser.add_argument('--client', type=int, metavar='PORT', default=0,
761 parser.add_argument('--client', type=int, metavar='PORT', default=0,
771 help='set the XREP port for clients [default: random]')
762 help='set the XREP port for clients [default: random]')
@@ -787,14 +778,24 b' def main():'
787 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
778 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
788 help='select the task scheduler [default: pure ZMQ]')
779 help='select the task scheduler [default: pure ZMQ]')
789
780
790 args = parser.parse_args()
781 return parser
791
782
792 if args.url:
783 def main():
793 args.transport,iface = args.url.split('://')
784 import time
794 iface = iface.split(':')
785 from multiprocessing import Process
795 args.ip = iface[0]
786
796 if iface[1]:
787 from zmq.eventloop.zmqstream import ZMQStream
797 args.regport = iface[1]
788 from zmq.devices import ProcessMonitoredQueue
789 from zmq.log import handlers
790
791 import streamsession as session
792 import heartmonitor
793 from scheduler import launch_scheduler
794
795 parser = make_argument_parser()
796
797 args = parser.parse_args()
798 parse_url(args)
798
799
799 iface="%s://%s"%(args.transport,args.ip)+':%i'
800 iface="%s://%s"%(args.transport,args.ip)+':%i'
800
801
@@ -891,7 +892,7 b' def main():'
891 q.start()
892 q.start()
892 else:
893 else:
893 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 print sargs
895 print (sargs)
895 p = Process(target=launch_scheduler, args=sargs)
896 p = Process(target=launch_scheduler, args=sargs)
896 p.daemon=True
897 p.daemon=True
897 p.start()
898 p.start()
@@ -915,5 +916,8 b' def main():'
915 'notification': iface%nport
916 'notification': iface%nport
916 }
917 }
917 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
918 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
919 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
918 loop.start()
920 loop.start()
919
921
922 if __name__ == '__main__':
923 main()
@@ -3,6 +3,7 b''
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 import sys
7 import sys
7 import time
8 import time
8 import traceback
9 import traceback
@@ -16,7 +17,7 b' from streamsession import Message, StreamSession'
16 from client import Client
17 from client import Client
17 import streamkernel as kernel
18 import streamkernel as kernel
18 import heartmonitor
19 import heartmonitor
19 from entry_point import make_argument_parser, connect_logger
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
20 # import taskthread
21 # import taskthread
21 # from log import logger
22 # from log import logger
22
23
@@ -72,7 +73,7 b' class Engine(object):'
72 self.control = zmqstream.ZMQStream(control, self.loop)
73 self.control = zmqstream.ZMQStream(control, self.loop)
73
74
74 task_addr = msg.content.task
75 task_addr = msg.content.task
75 print task_addr
76 print (task_addr)
76 if task_addr:
77 if task_addr:
77 # task as stream:
78 # task as stream:
78 task = self.context.socket(zmq.PAIR)
79 task = self.context.socket(zmq.PAIR)
@@ -103,7 +104,7 b' class Engine(object):'
103
104
104 # logger.info("engine::completed registration with id %s"%self.session.username)
105 # logger.info("engine::completed registration with id %s"%self.session.username)
105
106
106 print msg
107 print (msg)
107
108
108 def unregister(self):
109 def unregister(self):
109 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -111,24 +112,20 b' class Engine(object):'
111 sys.exit(0)
112 sys.exit(0)
112
113
113 def start(self):
114 def start(self):
114 print "registering"
115 print ("registering")
115 self.register()
116 self.register()
116
117
117
118
118 def main():
119 def main():
119
120
120 parser = make_argument_parser()
121 parser = make_base_argument_parser()
121
122
122 args = parser.parse_args()
123 args = parser.parse_args()
123
124
124 if args.url:
125 parse_url(args)
125 args.transport,iface = args.url.split('://')
126 iface = iface.split(':')
127 args.ip = iface[0]
128 if iface[1]:
129 args.regport = iface[1]
130
126
131 iface="%s://%s"%(args.transport,args.ip)+':%i'
127 iface="%s://%s"%(args.transport,args.ip)+':%i'
128
132 loop = ioloop.IOLoop.instance()
129 loop = ioloop.IOLoop.instance()
133 session = StreamSession()
130 session = StreamSession()
134 ctx = zmq.Context()
131 ctx = zmq.Context()
@@ -137,8 +134,8 b' def main():'
137 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
134 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
138
135
139 reg_conn = iface % args.regport
136 reg_conn = iface % args.regport
140 print reg_conn
137 print (reg_conn)
141 print >>sys.__stdout__, "Starting the engine..."
138 print ("Starting the engine...", file=sys.__stderr__)
142
139
143 reg = ctx.socket(zmq.PAIR)
140 reg = ctx.socket(zmq.PAIR)
144 reg.connect(reg_conn)
141 reg.connect(reg_conn)
@@ -39,9 +39,21 b' def select_random_ports(n):'
39 sock.close()
39 sock.close()
40 ports[i] = port
40 ports[i] = port
41 return ports
41 return ports
42
43 def parse_url(args):
44 if args.url:
45 iface = args.url.split('://',1)
46 if len(args) == 2:
47 args.transport,iface = iface
48 iface = iface.split(':')
49 args.ip = iface[0]
50 if iface[1]:
51 args.regport = iface[1]
52 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
53
42
54
43
55
44 def make_argument_parser():
56 def make_base_argument_parser():
45 """ Creates an ArgumentParser for the generic arguments supported by all
57 """ Creates an ArgumentParser for the generic arguments supported by all
46 ipcluster entry points.
58 ipcluster entry points.
47 """
59 """
@@ -58,6 +70,9 b' def make_argument_parser():'
58 help='set the log level [default: DEBUG]')
70 help='set the log level [default: DEBUG]')
59 parser.add_argument('--ident', type=str,
71 parser.add_argument('--ident', type=str,
60 help='set the ZMQ identity [default: random]')
72 help='set the ZMQ identity [default: random]')
73 parser.add_argument('--packer', type=str, default='json',
74 choices=['json','pickle'],
75 help='set the message format method [default: json]')
61 parser.add_argument('--url', type=str,
76 parser.add_argument('--url', type=str,
62 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
77 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
63
78
@@ -217,6 +217,7 b" if 'setuptools' in sys.modules:"
217 'pycolor = IPython.utils.PyColorize:main',
217 'pycolor = IPython.utils.PyColorize:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 'ipenginez = IPython.zmq.parallel.engine:main',
219 'ipenginez = IPython.zmq.parallel.engine:main',
220 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
220 'iptest = IPython.testing.iptest:main',
221 'iptest = IPython.testing.iptest:main',
221 'irunner = IPython.lib.irunner:main'
222 'irunner = IPython.lib.irunner:main'
222 ]
223 ]
General Comments 0
You need to be logged in to leave comments. Login now