##// END OF EJS Templates
added zmq controller/engine entry points
MinRK -
Show More
@@ -0,0 +1,74 b''
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
3 """
4
5 # Standard library imports.
6 import logging
7 import atexit
8 import os
9 import socket
10 from subprocess import Popen, PIPE
11 import sys
12
13 # System library imports.
14 import zmq
15 from zmq.log import handlers
16 # Local imports.
17 from IPython.core.ultratb import FormattedTB
18 from IPython.external.argparse import ArgumentParser
19 from IPython.zmq.log import logger
20
21 def split_ports(s, n):
22 """Parser helper for multiport strings"""
23 if not s:
24 return tuple([0]*n)
25 ports = map(int, s.split(','))
26 if len(ports) != n:
27 raise ValueError
28 return ports
29
30 def select_random_ports(n):
31 """Selects and return n random ports that are open."""
32 ports = []
33 for i in xrange(n):
34 sock = socket.socket()
35 sock.bind(('', 0))
36 ports.append(sock)
37 for i, sock in enumerate(ports):
38 port = sock.getsockname()[1]
39 sock.close()
40 ports[i] = port
41 return ports
42
43
44 def make_argument_parser():
45 """ Creates an ArgumentParser for the generic arguments supported by all
46 ipcluster entry points.
47 """
48 parser = ArgumentParser()
49 parser.add_argument('--ip', type=str, default='127.0.0.1',
50 help='set the controller\'s IP address [default: local]')
51 parser.add_argument('--transport', type=str, default='tcp',
52 help='set the transport to use [default: tcp]')
53 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
54 help='set the XREP port for registration [default: 10101]')
55 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
56 help='set the PUB port for logging [default: 10201]')
57 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
58 help='set the log level [default: DEBUG]')
59 parser.add_argument('--ident', type=str,
60 help='set the ZMQ identity [default: random]')
61 parser.add_argument('--url', type=str,
62 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
63
64 return parser
65
66
67 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
68 lsock = context.socket(zmq.PUB)
69 lsock.connect(iface)
70 handler = handlers.PUBHandler(lsock)
71 handler.setLevel(loglevel)
72 handler.root_topic = root
73 logger.addHandler(handler)
74 No newline at end of file
@@ -15,15 +15,19 b' This is the master object that handles connections from engines, clients, and'
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 from datetime import datetime
17 from datetime import datetime
18 import logging
18
19
19 import zmq
20 import zmq
20 from zmq.eventloop import zmqstream, ioloop
21 from zmq.eventloop import zmqstream, ioloop
21 import uuid
22 import uuid
22
23
23 # internal:
24 # internal:
24 from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack
25 from IPython.zmq.log import logger # a Logger object
25 from IPython.zmq.log import logger # a Logger object
26 from IPython.zmq.entry_point import bind_port
26
27
28 from streamsession import Message, wrap_exception
29 from entry_point import (make_argument_parser, select_random_ports, split_ports,
30 connect_logger)
27 # from messages import json # use the same import switches
31 # from messages import json # use the same import switches
28
32
29 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
@@ -359,10 +363,11 b' class Controller(object):'
359 triggers unregistration"""
363 triggers unregistration"""
360 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
364 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
361 eid = self.hearts.get(heart, None)
365 eid = self.hearts.get(heart, None)
366 queue = self.engines[eid].queue
362 if eid is None:
367 if eid is None:
363 logger.info("heartbeat::ignoring heart failure %r"%heart)
368 logger.info("heartbeat::ignoring heart failure %r"%heart)
364 else:
369 else:
365 self.unregister_engine(heart, dict(content=dict(id=eid)))
370 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
366
371
367 #----------------------- MUX Queue Traffic ------------------------------
372 #----------------------- MUX Queue Traffic ------------------------------
368
373
@@ -642,29 +647,6 b' class Controller(object):'
642 # pending
647 # pending
643 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
648 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
644
649
645 def job_status(self, client_id, msg):
646 """handle queue_status request"""
647 content = msg['content']
648 msg_ids = content['msg_ids']
649 try:
650 targets = self._validate_targets(targets)
651 except:
652 content = wrap_exception()
653 self.session.send(self.clientele, "controller_error",
654 content=content, ident=client_id)
655 return
656 verbose = msg.get('verbose', False)
657 content = dict()
658 for t in targets:
659 queue = self.queues[t]
660 completed = self.completed[t]
661 if not verbose:
662 queue = len(queue)
663 completed = len(completed)
664 content[str(t)] = {'queue': queue, 'completed': completed }
665 # pending
666 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
667
668 def purge_results(self, client_id, msg):
650 def purge_results(self, client_id, msg):
669 content = msg['content']
651 content = msg['content']
670 msg_ids = content.get('msg_ids', [])
652 msg_ids = content.get('msg_ids', [])
@@ -769,4 +751,171 b' class Controller(object):'
769 return eid, msg
751 return eid, msg
770
752
771
753
772 No newline at end of file
754 #--------------------
755 # Entry Point
756 #--------------------
757
758 def main():
759 import time
760 from multiprocessing import Process
761
762 from zmq.eventloop.zmqstream import ZMQStream
763 from zmq.devices import ProcessMonitoredQueue
764 from zmq.log import handlers
765
766 import streamsession as session
767 import heartmonitor
768 from scheduler import launch_scheduler
769
770 parser = make_argument_parser()
771
772 parser.add_argument('--client', type=int, metavar='PORT', default=0,
773 help='set the XREP port for clients [default: random]')
774 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
775 help='set the PUB socket for registration notification [default: random]')
776 parser.add_argument('--hb', type=str, metavar='PORTS',
777 help='set the 2 ports for heartbeats [default: random]')
778 parser.add_argument('--ping', type=int, default=3000,
779 help='set the heartbeat period in ms [default: 3000]')
780 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
781 help='set the SUB port for queue monitoring [default: random]')
782 parser.add_argument('--mux', type=str, metavar='PORTS',
783 help='set the XREP ports for the MUX queue [default: random]')
784 parser.add_argument('--task', type=str, metavar='PORTS',
785 help='set the XREP/XREQ ports for the task queue [default: random]')
786 parser.add_argument('--control', type=str, metavar='PORTS',
787 help='set the XREP ports for the control queue [default: random]')
788 parser.add_argument('--scheduler', type=str, default='pure',
789 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
790 help='select the task scheduler [default: pure ZMQ]')
791
792 args = parser.parse_args()
793
794 if args.url:
795 args.transport,iface = args.url.split('://')
796 iface = iface.split(':')
797 args.ip = iface[0]
798 if iface[1]:
799 args.regport = iface[1]
800
801 iface="%s://%s"%(args.transport,args.ip)+':%i'
802
803 random_ports = 0
804 if args.hb:
805 hb = split_ports(args.hb, 2)
806 else:
807 hb = select_random_ports(2)
808 if args.mux:
809 mux = split_ports(args.mux, 2)
810 else:
811 mux = None
812 random_ports += 2
813 if args.task:
814 task = split_ports(args.task, 2)
815 else:
816 task = None
817 random_ports += 2
818 if args.control:
819 control = split_ports(args.control, 2)
820 else:
821 control = None
822 random_ports += 2
823
824 ctx = zmq.Context()
825 loop = ioloop.IOLoop.instance()
826
827 # setup logging
828 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
829
830 # Registrar socket
831 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
832 regport = bind_port(reg, args.ip, args.regport)
833
834 ### Engine connections ###
835
836 # heartbeat
837 hpub = ctx.socket(zmq.PUB)
838 bind_port(hpub, args.ip, hb[0])
839 hrep = ctx.socket(zmq.XREP)
840 bind_port(hrep, args.ip, hb[1])
841
842 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
843 hmon.start()
844
845 ### Client connections ###
846 # Clientele socket
847 c = ZMQStream(ctx.socket(zmq.XREP), loop)
848 cport = bind_port(c, args.ip, args.client)
849 # Notifier socket
850 n = ZMQStream(ctx.socket(zmq.PUB), loop)
851 nport = bind_port(n, args.ip, args.notice)
852
853 thesession = session.StreamSession(username=args.ident or "controller")
854
855 ### build and launch the queues ###
856
857 # monitor socket
858 sub = ctx.socket(zmq.SUB)
859 sub.setsockopt(zmq.SUBSCRIBE, "")
860 monport = bind_port(sub, args.ip, args.monitor)
861 sub = ZMQStream(sub, loop)
862
863 ports = select_random_ports(random_ports)
864 # Multiplexer Queue (in a Process)
865 if not mux:
866 mux = (ports.pop(),ports.pop())
867 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
868 q.bind_in(iface%mux[0])
869 q.bind_out(iface%mux[1])
870 q.connect_mon(iface%monport)
871 q.daemon=True
872 q.start()
873
874 # Control Queue (in a Process)
875 if not control:
876 control = (ports.pop(),ports.pop())
877 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
878 q.bind_in(iface%control[0])
879 q.bind_out(iface%control[1])
880 q.connect_mon(iface%monport)
881 q.daemon=True
882 q.start()
883
884 # Task Queue (in a Process)
885 if not task:
886 task = (ports.pop(),ports.pop())
887 if args.scheduler == 'pure':
888 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
889 q.bind_in(iface%task[0])
890 q.bind_out(iface%task[1])
891 q.connect_mon(iface%monport)
892 q.daemon=True
893 q.start()
894 else:
895 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
896 print sargs
897 p = Process(target=launch_scheduler, args=sargs)
898 p.daemon=True
899 p.start()
900
901 time.sleep(.25)
902
903 # build connection dicts
904 engine_addrs = {
905 'control' : iface%control[1],
906 'queue': iface%mux[1],
907 'heartbeat': (iface%hb[0], iface%hb[1]),
908 'task' : iface%task[1],
909 'monitor' : iface%monport,
910 }
911
912 client_addrs = {
913 'control' : iface%control[0],
914 'query': iface%cport,
915 'queue': iface%mux[0],
916 'task' : iface%task[0],
917 'notification': iface%nport
918 }
919 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
920 loop.start()
921
@@ -16,6 +16,7 b' from streamsession import Message, StreamSession'
16 from client import Client
16 from client import Client
17 import streamkernel as kernel
17 import streamkernel as kernel
18 import heartmonitor
18 import heartmonitor
19 from entry_point import make_argument_parser, connect_logger
19 # import taskthread
20 # import taskthread
20 # from log import logger
21 # from log import logger
21
22
@@ -75,6 +76,7 b' class Engine(object):'
75 if task_addr:
76 if task_addr:
76 # task as stream:
77 # task as stream:
77 task = self.context.socket(zmq.PAIR)
78 task = self.context.socket(zmq.PAIR)
79 task.setsockopt(zmq.IDENTITY, self.ident)
78 task.connect(str(task_addr))
80 task.connect(str(task_addr))
79 self.task_stream = zmqstream.ZMQStream(task, self.loop)
81 self.task_stream = zmqstream.ZMQStream(task, self.loop)
80 # TaskThread:
82 # TaskThread:
@@ -113,16 +115,28 b' class Engine(object):'
113 self.register()
115 self.register()
114
116
115
117
116 if __name__ == '__main__':
118 def main():
117
119
120 parser = make_argument_parser()
121
122 args = parser.parse_args()
123
124 if args.url:
125 args.transport,iface = args.url.split('://')
126 iface = iface.split(':')
127 args.ip = iface[0]
128 if iface[1]:
129 args.regport = iface[1]
130
131 iface="%s://%s"%(args.transport,args.ip)+':%i'
118 loop = ioloop.IOLoop.instance()
132 loop = ioloop.IOLoop.instance()
119 session = StreamSession()
133 session = StreamSession()
120 ctx = zmq.Context()
134 ctx = zmq.Context()
121
135
122 ip = '127.0.0.1'
136 # setup logging
123 reg_port = 10101
137 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
124 connection = ('tcp://%s' % ip) + ':%i'
138
125 reg_conn = connection % reg_port
139 reg_conn = iface % args.regport
126 print reg_conn
140 print reg_conn
127 print >>sys.__stdout__, "Starting the engine..."
141 print >>sys.__stdout__, "Starting the engine..."
128
142
@@ -130,12 +144,8 b" if __name__ == '__main__':"
130 reg.connect(reg_conn)
144 reg.connect(reg_conn)
131 reg = zmqstream.ZMQStream(reg, loop)
145 reg = zmqstream.ZMQStream(reg, loop)
132 client = Client(reg_conn)
146 client = Client(reg_conn)
133 if len(sys.argv) > 1:
134 queue_id=sys.argv[1]
135 else:
136 queue_id = None
137
147
138 e = Engine(ctx, loop, session, reg, client, queue_id)
148 e = Engine(ctx, loop, session, reg, client, args.ident)
139 dc = ioloop.DelayedCallback(e.start, 500, loop)
149 dc = ioloop.DelayedCallback(e.start, 100, loop)
140 dc.start()
150 dc.start()
141 loop.start() No newline at end of file
151 loop.start()
@@ -191,7 +191,7 b' class Kernel(object):'
191 # self.reply_socket.send(ident,zmq.SNDMORE)
191 # self.reply_socket.send(ident,zmq.SNDMORE)
192 # self.reply_socket.send_json(reply_msg)
192 # self.reply_socket.send_json(reply_msg)
193 reply_msg = self.session.send(stream, reply_type,
193 reply_msg = self.session.send(stream, reply_type,
194 content={'status' : 'aborted'}, parent=msg, ident=idents)
194 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
195 print>>sys.__stdout__, Message(reply_msg)
195 print>>sys.__stdout__, Message(reply_msg)
196 # We need to wait a bit for requests to come in. This can probably
196 # We need to wait a bit for requests to come in. This can probably
197 # be set shorter for true asynchronous clients.
197 # be set shorter for true asynchronous clients.
@@ -208,8 +208,8 b' class Kernel(object):'
208 self.aborted.add(str(mid))
208 self.aborted.add(str(mid))
209
209
210 content = dict(status='ok')
210 content = dict(status='ok')
211 reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent,
211 reply_msg = self.session.send(stream, 'abort_reply', content=content,
212 ident=ident)
212 parent=parent, ident=ident)[0]
213 print>>sys.__stdout__, Message(reply_msg)
213 print>>sys.__stdout__, Message(reply_msg)
214
214
215 def kill_request(self, stream, idents, parent):
215 def kill_request(self, stream, idents, parent):
@@ -312,7 +312,7 b' class Kernel(object):'
312 # self.reply_socket.send(ident, zmq.SNDMORE)
312 # self.reply_socket.send(ident, zmq.SNDMORE)
313 # self.reply_socket.send_json(reply_msg)
313 # self.reply_socket.send_json(reply_msg)
314 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
314 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
315 # print>>sys.__stdout__, Message(reply_msg)
315 print>>sys.__stdout__, Message(reply_msg)
316 if reply_msg['content']['status'] == u'error':
316 if reply_msg['content']['status'] == u'error':
317 self.abort_queues()
317 self.abort_queues()
318
318
@@ -327,6 +327,7 b' class Kernel(object):'
327 return self.completer.complete(msg.content.line, msg.content.text)
327 return self.completer.complete(msg.content.line, msg.content.text)
328
328
329 def apply_request(self, stream, ident, parent):
329 def apply_request(self, stream, ident, parent):
330 print parent
330 try:
331 try:
331 content = parent[u'content']
332 content = parent[u'content']
332 bufs = parent[u'buffers']
333 bufs = parent[u'buffers']
@@ -399,7 +400,7 b' class Kernel(object):'
399 # self.reply_socket.send_json(reply_msg)
400 # self.reply_socket.send_json(reply_msg)
400 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
401 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
401 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
402 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
402 # print>>sys.__stdout__, Message(reply_msg)
403 print>>sys.__stdout__, Message(reply_msg)
403 # if reply_msg['content']['status'] == u'error':
404 # if reply_msg['content']['status'] == u'error':
404 # self.abort_queues()
405 # self.abort_queues()
405
406
@@ -16,7 +16,7 b' config = {'
16 'clientport': 10201,
16 'clientport': 10201,
17 'notifierport': 10202,
17 'notifierport': 10202,
18
18
19 'logport': 20201
19 'logport': 20202
20 }
20 }
21
21
22
22
@@ -18,7 +18,7 b' from IPython.zmq.parallel import controller, heartmonitor, streamsession as sess'
18
18
19 def setup():
19 def setup():
20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
21 ctx = zmq.Context(1)
21 ctx = zmq.Context()
22 loop = ioloop.IOLoop.instance()
22 loop = ioloop.IOLoop.instance()
23
23
24 # port config
24 # port config
@@ -20,7 +20,7 b''
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21
21
22 import zmq
22 import zmq
23 logport = 20201
23 logport = 20202
24 def main(topics, addrs):
24 def main(topics, addrs):
25
25
26 context = zmq.Context()
26 context = zmq.Context()
@@ -215,9 +215,8 b" if 'setuptools' in sys.modules:"
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 'pycolor = IPython.utils.PyColorize:main',
217 'pycolor = IPython.utils.PyColorize:main',
218 # 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 # 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
219 'ipenginez = IPython.zmq.parallel.engine:main',
220 # 'ipcluster = IPython.kernel.ipclusterapp:launch_new_instance',
221 'iptest = IPython.testing.iptest:main',
220 'iptest = IPython.testing.iptest:main',
222 'irunner = IPython.lib.irunner:main'
221 'irunner = IPython.lib.irunner:main'
223 ]
222 ]
General Comments 0
You need to be logged in to leave comments. Login now