##// END OF EJS Templates
improved logging + Hub,Engine,Scheduler are Configurable
MinRK -
Show More
@@ -21,7 +21,3 b' class EnginePUBHandler(PUBHandler):'
21 else:
21 else:
22 return "engine"
22 return "engine"
23
23
24
25 logger = logging.getLogger('ipzmq')
26 logger.setLevel(logging.DEBUG)
27
@@ -90,11 +90,6 b' def defaultblock(f, self, *args, **kwargs):'
90 # Classes
90 # Classes
91 #--------------------------------------------------------------------------
91 #--------------------------------------------------------------------------
92
92
93 class AbortedTask(object):
94 """A basic wrapper object describing an aborted task."""
95 def __init__(self, msg_id):
96 self.msg_id = msg_id
97
98 class ResultDict(dict):
93 class ResultDict(dict):
99 """A subclass of dict that raises errors if it has them."""
94 """A subclass of dict that raises errors if it has them."""
100 def __getitem__(self, key):
95 def __getitem__(self, key):
@@ -332,10 +327,10 b' class Client(object):'
332 msg = ss.Message(msg)
327 msg = ss.Message(msg)
333 content = msg.content
328 content = msg.content
334 if content.status == 'ok':
329 if content.status == 'ok':
335 if content.queue:
330 if content.mux:
336 self._mux_socket = self.context.socket(zmq.PAIR)
331 self._mux_socket = self.context.socket(zmq.PAIR)
337 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
332 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
338 connect_socket(self._mux_socket, content.queue)
333 connect_socket(self._mux_socket, content.mux)
339 if content.task:
334 if content.task:
340 self._task_socket = self.context.socket(zmq.PAIR)
335 self._task_socket = self.context.socket(zmq.PAIR)
341 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
336 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
@@ -17,6 +17,7 b' from __future__ import print_function'
17
17
18 import os
18 import os
19 import time
19 import time
20 import logging
20 from multiprocessing import Process
21 from multiprocessing import Process
21
22
22 import zmq
23 import zmq
@@ -29,7 +30,8 b' from IPython.zmq.entry_point import bind_port'
29
30
30 from hub import Hub
31 from hub import Hub
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
33 connect_logger, parse_url, signal_children, generate_exec_key,
34 local_logger)
33
35
34
36
35 import streamsession as session
37 import streamsession as session
@@ -118,8 +120,6 b' def main(argv=None):'
118 ctx = zmq.Context()
120 ctx = zmq.Context()
119 loop = ioloop.IOLoop.instance()
121 loop = ioloop.IOLoop.instance()
120
122
121 # setup logging
122 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
123
123
124 # Registrar socket
124 # Registrar socket
125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
@@ -207,7 +207,9 b' def main(argv=None):'
207 q.start()
207 q.start()
208 children.append(q.launcher)
208 children.append(q.launcher)
209 else:
209 else:
210 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
210 log_addr = iface%args.logport if args.logport else None
211 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
212 log_addr, args.loglevel, args.scheduler)
211 print (sargs)
213 print (sargs)
212 q = Process(target=launch_scheduler, args=sargs)
214 q = Process(target=launch_scheduler, args=sargs)
213 q.daemon=True
215 q.daemon=True
@@ -224,7 +226,7 b' def main(argv=None):'
224 # build connection dicts
226 # build connection dicts
225 engine_addrs = {
227 engine_addrs = {
226 'control' : iface%control[1],
228 'control' : iface%control[1],
227 'queue': iface%mux[1],
229 'mux': iface%mux[1],
228 'heartbeat': (iface%hb[0], iface%hb[1]),
230 'heartbeat': (iface%hb[0], iface%hb[1]),
229 'task' : iface%task[1],
231 'task' : iface%task[1],
230 'iopub' : iface%iopub[1],
232 'iopub' : iface%iopub[1],
@@ -234,15 +236,24 b' def main(argv=None):'
234 client_addrs = {
236 client_addrs = {
235 'control' : iface%control[0],
237 'control' : iface%control[0],
236 'query': iface%cport,
238 'query': iface%cport,
237 'queue': iface%mux[0],
239 'mux': iface%mux[0],
238 'task' : iface%task[0],
240 'task' : iface%task[0],
239 'iopub' : iface%iopub[0],
241 'iopub' : iface%iopub[0],
240 'notification': iface%nport
242 'notification': iface%nport
241 }
243 }
242
244
245 # setup logging
246 if args.logport:
247 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
248 else:
249 local_logger(args.loglevel)
250
243 # register relay of signals to the children
251 # register relay of signals to the children
244 signal_children(children)
252 signal_children(children)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
253 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
254 registrar=reg, clientele=c, notifier=n, db=db,
255 engine_addrs=engine_addrs, client_addrs=client_addrs)
256
246 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
247 dc.start()
258 dc.start()
248 loop.start()
259 loop.start()
@@ -8,49 +8,48 b' import sys'
8 import time
8 import time
9 import traceback
9 import traceback
10 import uuid
10 import uuid
11 import logging
11 from pprint import pprint
12 from pprint import pprint
12
13
13 import zmq
14 import zmq
14 from zmq.eventloop import ioloop, zmqstream
15 from zmq.eventloop import ioloop, zmqstream
15
16
16 from IPython.utils.traitlets import HasTraits
17 # internal
17 from IPython.utils.localinterfaces import LOCALHOST
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
20 # from IPython.utils.localinterfaces import LOCALHOST
18
21
19 from streamsession import Message, StreamSession
22 from streamsession import Message, StreamSession
20 from client import Client
21 from streamkernel import Kernel, make_kernel
23 from streamkernel import Kernel, make_kernel
22 import heartmonitor
24 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
25 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 local_logger)
24 # import taskthread
27 # import taskthread
25 # from log import logger
28 logger = logging.getLogger()
26
27
29
28 def printer(*msg):
30 def printer(*msg):
29 pprint(msg, stream=sys.__stdout__)
31 # print (logger.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
30
33
31 class Engine(object):
34 class Engine(Configurable):
32 """IPython engine"""
35 """IPython engine"""
33
36
34 id=None
35 context=None
36 loop=None
37 session=None
38 ident=None
39 registrar=None
40 heart=None
41 kernel=None
37 kernel=None
42 user_ns=None
38 id=None
43
39
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
40 # configurables:
45 heart_id=None, user_ns=None):
41 context=Instance(zmq.Context)
46 self.context = context
42 loop=Instance(ioloop.IOLoop)
47 self.loop = loop
43 session=Instance(StreamSession)
48 self.session = session
44 ident=Str()
49 self.registrar = registrar
45 registrar=Instance(zmqstream.ZMQStream)
50 self.client = client
46 user_ns=Dict()
51 self.ident = ident if ident else str(uuid.uuid4())
47
48 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 if not self.ident:
51 self.ident = str(uuid.uuid4())
52 self.registrar.on_send(printer)
52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
54
53
55 def register(self):
54 def register(self):
56
55
@@ -64,8 +63,9 b' class Engine(object):'
64 idents,msg = self.session.feed_identities(msg)
63 idents,msg = self.session.feed_identities(msg)
65 msg = Message(self.session.unpack_message(msg))
64 msg = Message(self.session.unpack_message(msg))
66 if msg.content.status == 'ok':
65 if msg.content.status == 'ok':
67 self.session.username = str(msg.content.id)
66 self.id = int(msg.content.id)
68 queue_addr = msg.content.queue
67 self.session.username = 'engine-%i'%self.id
68 queue_addr = msg.content.mux
69 shell_addrs = [str(queue_addr)]
69 shell_addrs = [ str(queue_addr) ]
70 control_addr = str(msg.content.control)
70 control_addr = str(msg.content.control)
71 task_addr = msg.content.task
71 task_addr = msg.content.task
@@ -75,7 +75,7 b' class Engine(object):'
75
75
76 hb_addrs = msg.content.heartbeat
76 hb_addrs = msg.content.heartbeat
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 hb_addrs, client_addr=None, loop=self.loop,
79 hb_addrs, client_addr=None, loop=self.loop,
80 context=self.context, key=self.session.key)[-1]
80 context=self.context, key=self.session.key)[-1]
81 self.kernel = k
81 self.kernel = k
@@ -84,12 +84,12 b' class Engine(object):'
84 self.kernel.user_ns = self.user_ns
84 self.kernel.user_ns = self.user_ns
85
85
86 else:
86 else:
87 # logger.error("Registration Failed: %s"%msg)
87 logger.error("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
89
89
90 # logger.info("engine::completed registration with id %s"%self.session.username)
90 logger.info("completed registration with id %i"%self.id)
91
91
92 print (msg,file=sys.__stdout__)
92 # logger.info(str(msg))
93
93
94 def unregister(self):
94 def unregister(self):
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -97,7 +97,7 b' class Engine(object):'
97 sys.exit(0)
97 sys.exit(0)
98
98
99 def start(self):
99 def start(self):
100 print ("registering",file=sys.__stdout__)
100 logger.info("registering")
101 self.register()
101 self.register()
102
102
103
103
@@ -118,7 +118,6 b' def main(argv=None, user_ns=None):'
118 ctx = zmq.Context()
118 ctx = zmq.Context()
119
119
120 # setup logging
120 # setup logging
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
122
121
123 reg_conn = iface % args.regport
122 reg_conn = iface % args.regport
124 print (reg_conn, file=sys.__stdout__)
123 print (reg_conn, file=sys.__stdout__)
@@ -127,10 +126,16 b' def main(argv=None, user_ns=None):'
127 reg = ctx.socket(zmq.PAIR)
126 reg = ctx.socket(zmq.PAIR)
128 reg.connect(reg_conn)
127 reg.connect(reg_conn)
129 reg = zmqstream.ZMQStream(reg, loop)
128 reg = zmqstream.ZMQStream(reg, loop)
130 client = None
131
129
132 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
130 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
133 dc = ioloop.DelayedCallback(e.start, 100, loop)
131 ident=args.ident or '', user_ns=user_ns)
132 if args.logport:
133 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
134 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
135 else:
136 local_logger(args.loglevel)
137
138 dc = ioloop.DelayedCallback(e.start, 0, loop)
134 dc.start()
139 dc.start()
135 loop.start()
140 loop.start()
136
141
@@ -22,7 +22,7 b' from zmq.log import handlers'
22 # Local imports.
22 # Local imports.
23 from IPython.core.ultratb import FormattedTB
23 from IPython.core.ultratb import FormattedTB
24 from IPython.external.argparse import ArgumentParser
24 from IPython.external.argparse import ArgumentParser
25 from IPython.zmq.log import logger
25 from IPython.zmq.log import EnginePUBHandler
26
26
27 def split_ports(s, n):
27 def split_ports(s, n):
28 """Parser helper for multiport strings"""
28 """Parser helper for multiport strings"""
@@ -82,6 +82,7 b' def make_base_argument_parser():'
82 """ Creates an ArgumentParser for the generic arguments supported by all
82 """ Creates an ArgumentParser for the generic arguments supported by all
83 ipcluster entry points.
83 ipcluster entry points.
84 """
84 """
85
85 parser = ArgumentParser()
86 parser = ArgumentParser()
86 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 help='set the controller\'s IP address [default: local]')
88 help='set the controller\'s IP address [default: local]')
@@ -89,10 +90,10 b' def make_base_argument_parser():'
89 help='set the transport to use [default: tcp]')
90 help='set the transport to use [default: tcp]')
90 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 help='set the XREP port for registration [default: 10101]')
92 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
93 help='set the PUB port for logging [default: 10201]')
94 help='set the PUB port for remote logging [default: log to stdout]')
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
95 help='set the log level [default: DEBUG]')
96 help='set the log level [default: INFO]')
96 parser.add_argument('--ident', type=str,
97 parser.add_argument('--ident', type=str,
97 help='set the ZMQ identity [default: random]')
98 help='set the ZMQ identity [default: random]')
98 parser.add_argument('--packer', type=str, default='json',
99 parser.add_argument('--packer', type=str, default='json',
@@ -105,17 +106,42 b' def make_base_argument_parser():'
105
106
106 return parser
107 return parser
107
108
108
109 def integer_loglevel(loglevel):
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
110 try:
110 try:
111 loglevel = int(loglevel)
111 loglevel = int(loglevel)
112 except ValueError:
112 except ValueError:
113 if isinstance(loglevel, str):
113 if isinstance(loglevel, str):
114 loglevel = getattr(logging, loglevel)
114 loglevel = getattr(logging, loglevel)
115 return loglevel
116
117 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
118 loglevel = integer_loglevel(loglevel)
115 lsock = context.socket(zmq.PUB)
119 lsock = context.socket(zmq.PUB)
116 lsock.connect(iface)
120 lsock.connect(iface)
117 handler = handlers.PUBHandler(lsock)
121 handler = handlers.PUBHandler(lsock)
118 handler.setLevel(loglevel)
122 handler.setLevel(loglevel)
119 handler.root_topic = root
123 handler.root_topic = root
124 logger = logging.getLogger()
125 logger.addHandler(handler)
126 logger.setLevel(loglevel)
127
128 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 logger = logging.getLogger()
130 loglevel = integer_loglevel(loglevel)
131 lsock = context.socket(zmq.PUB)
132 lsock.connect(iface)
133 handler = EnginePUBHandler(engine, lsock)
134 handler.setLevel(loglevel)
120 logger.addHandler(handler)
135 logger.addHandler(handler)
136 logger.setLevel(loglevel)
121
137
138 def local_logger(loglevel=logging.DEBUG):
139 loglevel = integer_loglevel(loglevel)
140 logger = logging.getLogger()
141 if logger.handlers:
142 # if there are any handlers, skip the hookup
143 return
144 handler = logging.StreamHandler()
145 handler.setLevel(loglevel)
146 logger.addHandler(handler)
147 logger.setLevel(loglevel)
@@ -7,13 +7,13 b' and hearts are tracked based on their XREQ identities.'
7 from __future__ import print_function
7 from __future__ import print_function
8 import time
8 import time
9 import uuid
9 import uuid
10 import logging
10
11
11 import zmq
12 import zmq
12 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
14
15
15 #internal
16 logger = logging.getLogger()
16 from IPython.zmq.log import logger
17
17
18 class Heart(object):
18 class Heart(object):
19 """A basic heart object for responding to a HeartMonitor.
19 """A basic heart object for responding to a HeartMonitor.
@@ -53,6 +53,7 b' class HeartMonitor(object):'
53 hearts=None
53 hearts=None
54 on_probation=None
54 on_probation=None
55 last_ping=None
55 last_ping=None
56 # debug=False
56
57
57 def __init__(self, loop, pingstream, pongstream, period=1000):
58 def __init__(self, loop, pingstream, pongstream, period=1000):
58 self.loop = loop
59 self.loop = loop
@@ -85,19 +86,6 b' class HeartMonitor(object):'
85 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 self._failure_handlers.add(handler)
87 self._failure_handlers.add(handler)
87
88
88 # def _flush(self):
89 # """override IOLoop triggers"""
90 # while True:
91 # try:
92 # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
93 # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
94 # except zmq.ZMQError:
95 # return
96 # else:
97 # self.handle_pong(msg)
98 # # print '.'
99 #
100
101 def beat(self):
89 def beat(self):
102 self.pongstream.flush()
90 self.pongstream.flush()
103 self.last_ping = self.lifetime
91 self.last_ping = self.lifetime
@@ -105,7 +93,7 b' class HeartMonitor(object):'
105 toc = time.time()
93 toc = time.time()
106 self.lifetime += toc-self.tic
94 self.lifetime += toc-self.tic
107 self.tic = toc
95 self.tic = toc
108 logger.debug("heartbeat::%s"%self.lifetime)
96 # logger.debug("heartbeat::%s"%self.lifetime)
109 goodhearts = self.hearts.intersection(self.responses)
97 goodhearts = self.hearts.intersection(self.responses)
110 missed_beats = self.hearts.difference(goodhearts)
98 missed_beats = self.hearts.difference(goodhearts)
111 heartfailures = self.on_probation.intersection(missed_beats)
99 heartfailures = self.on_probation.intersection(missed_beats)
@@ -144,7 +132,7 b' class HeartMonitor(object):'
144 "a heart just beat"
132 "a heart just beat"
145 if msg[1] == str(self.lifetime):
133 if msg[1] == str(self.lifetime):
146 delta = time.time()-self.tic
134 delta = time.time()-self.tic
147 logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
148 self.responses.add(msg[0])
136 self.responses.add(msg[0])
149 elif msg[1] == str(self.last_ping):
137 elif msg[1] == str(self.last_ping):
150 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
@@ -18,14 +18,19 b' from __future__ import print_function'
18 import sys
18 import sys
19 from datetime import datetime
19 from datetime import datetime
20 import time
20 import time
21 import logging
21
22
22 import zmq
23 import zmq
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop, zmqstream
24
25
25 # internal:
26 # internal:
26 from IPython.zmq.log import logger # a Logger object
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 # from IPython.zmq.log import logger # a Logger object
27
30
28 from streamsession import Message, wrap_exception, ISO8601
31 from streamsession import Message, wrap_exception, ISO8601
32 from heartmonitor import HeartMonitor
33 from util import validate_url_container
29
34
30 try:
35 try:
31 from pymongo.binary import Binary
36 from pymongo.binary import Binary
@@ -38,6 +43,8 b' else:'
38 # Code
43 # Code
39 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
40
45
46 logger = logging.getLogger()
47
41 def _passer(*args, **kwargs):
48 def _passer(*args, **kwargs):
42 return
49 return
43
50
@@ -46,7 +53,7 b' def _printer(*args, **kwargs):'
46 print (kwargs)
53 print (kwargs)
47
54
48 def init_record(msg):
55 def init_record(msg):
49 """return an empty TaskRecord dict, with all keys initialized with None."""
56 """Initialize a TaskRecord based on a request."""
50 header = msg['header']
57 header = msg['header']
51 return {
58 return {
52 'msg_id' : header['msg_id'],
59 'msg_id' : header['msg_id'],
@@ -71,7 +78,7 b' def init_record(msg):'
71 }
78 }
72
79
73
80
74 class EngineConnector(object):
81 class EngineConnector(HasTraits):
75 """A simple object for accessing the various zmq connections of an object.
82 """A simple object for accessing the various zmq connections of an object.
76 Attributes are:
83 Attributes are:
77 id (int): engine ID
84 id (int): engine ID
@@ -80,22 +87,18 b' class EngineConnector(object):'
80 registration (str): identity of registration XREQ socket
87 registration (str): identity of registration XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
88 heartbeat (str): identity of heartbeat XREQ socket
82 """
89 """
83 id=0
90 id=Int(0)
84 queue=None
91 queue=Str()
85 control=None
92 control=Str()
86 registration=None
93 registration=Str()
87 heartbeat=None
94 heartbeat=Str()
88 pending=None
95 pending=Instance(set)
89
96
90 def __init__(self, id, queue, registration, control, heartbeat=None):
97 def __init__(self, **kwargs):
91 logger.info("engine::Engine Connected: %i"%id)
98 super(EngineConnector, self).__init__(**kwargs)
92 self.id = id
99 logger.info("engine::Engine Connected: %i"%self.id)
93 self.queue = queue
100
94 self.registration = registration
101 class Hub(Configurable):
95 self.control = control
96 self.heartbeat = heartbeat
97
98 class Hub(object):
99 """The IPython Controller Hub with 0MQ connections
102 """The IPython Controller Hub with 0MQ connections
100
103
101 Parameters
104 Parameters
@@ -123,26 +126,25 b' class Hub(object):'
123 clients=None
126 clients=None
124 hearts=None
127 hearts=None
125 pending=None
128 pending=None
126 results=None
127 tasks=None
129 tasks=None
128 completed=None
130 completed=None
129 mia=None
131 # mia=None
130 incoming_registrations=None
132 incoming_registrations=None
131 registration_timeout=None
133 registration_timeout=None
132
134
133 #objects from constructor:
135 # objects from constructor:
134 loop=None
136 loop=Instance(ioloop.IOLoop)
135 registrar=None
137 registrar=Instance(zmqstream.ZMQStream)
136 clientelle=None
138 clientele=Instance(zmqstream.ZMQStream)
137 queue=None
139 monitor=Instance(zmqstream.ZMQStream)
138 heartbeat=None
140 heartmonitor=Instance(HeartMonitor)
139 notifier=None
141 notifier=Instance(zmqstream.ZMQStream)
140 db=None
142 db=Instance(object)
141 client_addr=None
143 client_addrs=Dict()
142 engine_addrs=None
144 engine_addrs=Dict()
143
145
144
146
145 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
147 def __init__(self, **kwargs):
146 """
148 """
147 # universal:
149 # universal:
148 loop: IOLoop for creating future connections
150 loop: IOLoop for creating future connections
@@ -158,6 +160,8 b' class Hub(object):'
158 engine_addrs: zmq address/protocol dict for engine connections
160 engine_addrs: zmq address/protocol dict for engine connections
159 client_addrs: zmq address/protocol dict for client connections
161 client_addrs: zmq address/protocol dict for client connections
160 """
162 """
163
164 super(Hub, self).__init__(**kwargs)
161 self.ids = set()
165 self.ids = set()
162 self.keytable={}
166 self.keytable={}
163 self.incoming_registrations={}
167 self.incoming_registrations={}
@@ -166,35 +170,44 b' class Hub(object):'
166 self.clients = {}
170 self.clients = {}
167 self.hearts = {}
171 self.hearts = {}
168 # self.mia = set()
172 # self.mia = set()
169
173 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
180 self._idcounter = 0
170 # self.sockets = {}
181 # self.sockets = {}
171 self.loop = loop
182 # self.loop = loop
172 self.session = session
183 # self.session = session
173 self.registrar = registrar
184 # self.registrar = registrar
174 self.clientele = clientele
185 # self.clientele = clientele
175 self.queue = queue
186 # self.queue = queue
176 self.heartbeat = heartbeat
187 # self.heartmonitor = heartbeat
177 self.notifier = notifier
188 # self.notifier = notifier
178 self.db = db
189 # self.db = db
179
190
180 # validate connection dicts:
191 # validate connection dicts:
181 self.client_addrs = client_addrs
192 # self.client_addrs = client_addrs
182 assert isinstance(client_addrs['queue'], str)
193 validate_url_container(self.client_addrs)
183 assert isinstance(client_addrs['control'], str)
194
195 # assert isinstance(self.client_addrs['queue'], str)
196 # assert isinstance(self.client_addrs['control'], str)
184 # self.hb_addrs = hb_addrs
197 # self.hb_addrs = hb_addrs
185 self.engine_addrs = engine_addrs
198 validate_url_container(self.engine_addrs)
186 assert isinstance(engine_addrs['queue'], str)
199 # self.engine_addrs = engine_addrs
187 assert isinstance(client_addrs['control'], str)
200 # assert isinstance(self.engine_addrs['queue'], str)
188 assert len(engine_addrs['heartbeat']) == 2
201 # assert isinstance(self.engine_addrs['control'], str)
202 # assert len(engine_addrs['heartbeat']) == 2
189
203
190 # register our callbacks
204 # register our callbacks
191 self.registrar.on_recv(self.dispatch_register_request)
205 self.registrar.on_recv(self.dispatch_register_request)
192 self.clientele.on_recv(self.dispatch_client_msg)
206 self.clientele.on_recv(self.dispatch_client_msg)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
207 self.monitor.on_recv(self.dispatch_monitor_traffic)
194
208
195 if heartbeat is not None:
209 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
210 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
198
211
199 self.monitor_handlers = { 'in' : self.save_queue_request,
212 self.monitor_handlers = { 'in' : self.save_queue_request,
200 'out': self.save_queue_result,
213 'out': self.save_queue_result,
@@ -218,25 +231,21 b' class Hub(object):'
218 'unregistration_request' : self.unregister_engine,
231 'unregistration_request' : self.unregister_engine,
219 'connection_request': self.connection_request,
232 'connection_request': self.connection_request,
220 }
233 }
221 self.registration_timeout = max(5000, 2*self.heartbeat.period)
222 # this is the stuff that will move to DB:
223 # self.results = {} # completed results
224 self.pending = set() # pending messages, keyed by msg_id
225 self.queues = {} # pending msg_ids keyed by engine_id
226 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
227 self.completed = {} # completed msg_ids keyed by engine_id
228 self.all_completed = set()
229
234
230 logger.info("controller::created controller")
235 logger.info("controller::created controller")
231
236
232 def _new_id(self):
237 @property
238 def _next_id(self):
233 """gemerate a new ID"""
239 """gemerate a new ID"""
234 newid = 0
240 newid = self._idcounter
235 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
241 self._idcounter += 1
236 # print newid, self.ids, self.incoming_registrations
237 while newid in self.ids or newid in incoming:
238 newid += 1
239 return newid
242 return newid
243 # newid = 0
244 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
245 # # print newid, self.ids, self.incoming_registrations
246 # while newid in self.ids or newid in incoming:
247 # newid += 1
248 # return newid
240
249
241 #-----------------------------------------------------------------------------
250 #-----------------------------------------------------------------------------
242 # message validation
251 # message validation
@@ -580,6 +589,9 b' class Hub(object):'
580 return
589 return
581
590
582 parent = msg['parent_header']
591 parent = msg['parent_header']
592 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
594 return
583 msg_id = parent['msg_id']
595 msg_id = parent['msg_id']
584 msg_type = msg['msg_type']
596 msg_type = msg['msg_type']
585 content = msg['content']
597 content = msg['content']
@@ -631,7 +643,7 b' class Hub(object):'
631 return
643 return
632 heart = content.get('heartbeat', None)
644 heart = content.get('heartbeat', None)
633 """register a new engine, and create the socket(s) necessary"""
645 """register a new engine, and create the socket(s) necessary"""
634 eid = self._new_id()
646 eid = self._next_id
635 # print (eid, queue, reg, heart)
647 # print (eid, queue, reg, heart)
636
648
637 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
@@ -644,10 +656,12 b' class Hub(object):'
644 raise KeyError("queue_id %r in use"%queue)
656 raise KeyError("queue_id %r in use"%queue)
645 except:
657 except:
646 content = wrap_exception()
658 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
647 elif heart in self.hearts: # need to check unique hearts?
660 elif heart in self.hearts: # need to check unique hearts?
648 try:
661 try:
649 raise KeyError("heart_id %r in use"%heart)
662 raise KeyError("heart_id %r in use"%heart)
650 except:
663 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
651 content = wrap_exception()
665 content = wrap_exception()
652 else:
666 else:
653 for h, pack in self.incoming_registrations.iteritems():
667 for h, pack in self.incoming_registrations.iteritems():
@@ -655,12 +669,14 b' class Hub(object):'
655 try:
669 try:
656 raise KeyError("heart_id %r in use"%heart)
670 raise KeyError("heart_id %r in use"%heart)
657 except:
671 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
658 content = wrap_exception()
673 content = wrap_exception()
659 break
674 break
660 elif queue == pack[1]:
675 elif queue == pack[1]:
661 try:
676 try:
662 raise KeyError("queue_id %r in use"%queue)
677 raise KeyError("queue_id %r in use"%queue)
663 except:
678 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
664 content = wrap_exception()
680 content = wrap_exception()
665 break
681 break
666
682
@@ -669,15 +685,15 b' class Hub(object):'
669 ident=reg)
685 ident=reg)
670
686
671 if content['status'] == 'ok':
687 if content['status'] == 'ok':
672 if heart in self.heartbeat.hearts:
688 if heart in self.heartmonitor.hearts:
673 # already beating
689 # already beating
674 self.incoming_registrations[heart] = (eid,queue,reg,None)
690 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
675 self.finish_registration(heart)
691 self.finish_registration(heart)
676 else:
692 else:
677 purge = lambda : self._purge_stalled_registration(heart)
693 purge = lambda : self._purge_stalled_registration(heart)
678 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
694 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
679 dc.start()
695 dc.start()
680 self.incoming_registrations[heart] = (eid,queue,reg,dc)
696 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
681 else:
697 else:
682 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
683 return eid
699 return eid
@@ -718,7 +734,8 b' class Hub(object):'
718 control = queue
734 control = queue
719 self.ids.add(eid)
735 self.ids.add(eid)
720 self.keytable[eid] = queue
736 self.keytable[eid] = queue
721 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
737 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
738 control=control, heartbeat=heart)
722 self.by_ident[queue] = eid
739 self.by_ident[queue] = eid
723 self.queues[eid] = list()
740 self.queues[eid] = list()
724 self.tasks[eid] = list()
741 self.tasks[eid] = list()
@@ -11,6 +11,8 b' Python Scheduler exists.'
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 from random import randint,random
13 from random import randint,random
14 import logging
15 from types import FunctionType
14
16
15 try:
17 try:
16 import numpy
18 import numpy
@@ -21,17 +23,22 b' import zmq'
21 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
22
24
23 # local imports
25 # local imports
24 from IPython.zmq.log import logger # a Logger object
26 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
29
25 from client import Client
30 from client import Client
26 from dependency import Dependency
31 from dependency import Dependency
27 import streamsession as ss
32 import streamsession as ss
33 from entry_point import connect_logger, local_logger
28
34
29 from IPython.external.decorator import decorator
35
36 logger = logging.getLogger()
30
37
31 @decorator
38 @decorator
32 def logged(f,self,*args,**kwargs):
39 def logged(f,self,*args,**kwargs):
33 # print ("#--------------------")
40 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 # print ("#--")
42 # print ("#--")
36 return f(self,*args, **kwargs)
43 return f(self,*args, **kwargs)
37
44
@@ -99,7 +106,7 b' def leastload(loads):'
99 #---------------------------------------------------------------------
106 #---------------------------------------------------------------------
100 # Classes
107 # Classes
101 #---------------------------------------------------------------------
108 #---------------------------------------------------------------------
102 class TaskScheduler(object):
109 class TaskScheduler(Configurable):
103 """Python TaskScheduler object.
110 """Python TaskScheduler object.
104
111
105 This is the simplest object that supports msg_id based
112 This is the simplest object that supports msg_id based
@@ -108,10 +115,15 b' class TaskScheduler(object):'
108
115
109 """
116 """
110
117
111 scheme = leastload # function for determining the destination
118 # configurables:
112 client_stream = None # client-facing stream
119 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
113 engine_stream = None # engine-facing stream
120 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
114 mon_stream = None # controller-facing stream
121 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
122 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
123 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
124 io_loop = Instance(ioloop.IOLoop)
125
126 # internals:
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 pending = None # dict by engine_uuid of submitted tasks
129 pending = None # dict by engine_uuid of submitted tasks
@@ -123,23 +135,10 b' class TaskScheduler(object):'
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124
136
125
137
126 def __init__(self, client_stream, engine_stream, mon_stream,
138 def __init__(self, **kwargs):
127 notifier_stream, scheme=None, io_loop=None):
139 super(TaskScheduler, self).__init__(**kwargs)
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
135
136 if scheme is not None:
137 self.scheme = scheme
138 else:
139 self.scheme = TaskScheduler.scheme
140
140
141 self.session = ss.StreamSession(username="TaskScheduler")
141 self.session = ss.StreamSession(username="TaskScheduler")
142
143 self.dependencies = {}
142 self.dependencies = {}
144 self.depending = {}
143 self.depending = {}
145 self.completed = {}
144 self.completed = {}
@@ -150,12 +149,13 b' class TaskScheduler(object):'
150 self.targets = []
149 self.targets = []
151 self.loads = []
150 self.loads = []
152
151
153 engine_stream.on_recv(self.dispatch_result, copy=False)
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 self._notification_handlers = dict(
153 self._notification_handlers = dict(
155 registration_notification = self._register_engine,
154 registration_notification = self._register_engine,
156 unregistration_notification = self._unregister_engine
155 unregistration_notification = self._unregister_engine
157 )
156 )
158 self.notifier_stream.on_recv(self.dispatch_notification)
157 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
159
159
160 def resume_receiving(self):
160 def resume_receiving(self):
161 """Resume accepting jobs."""
161 """Resume accepting jobs."""
@@ -183,6 +183,7 b' class TaskScheduler(object):'
183 handler(str(msg['content']['queue']))
183 handler(str(msg['content']['queue']))
184 except KeyError:
184 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
185 logger.error("task::Invalid notification msg: %s"%msg)
186
186 @logged
187 @logged
187 def _register_engine(self, uid):
188 def _register_engine(self, uid):
188 """New engine with ident `uid` became available."""
189 """New engine with ident `uid` became available."""
@@ -306,7 +307,8 b' class TaskScheduler(object):'
306 self.add_job(idx)
307 self.add_job(idx)
307 self.pending[target][msg_id] = (msg, follow)
308 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
309 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310 self.session.send(self.mon_stream, 'task_destination', content=content,
311 ident=['tracktask',self.session.session])
310
312
311 #-----------------------------------------------------------------------
313 #-----------------------------------------------------------------------
312 # Result Handling
314 # Result Handling
@@ -395,7 +397,7 b' class TaskScheduler(object):'
395
397
396
398
397
399
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
400 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
399 from zmq.eventloop import ioloop
401 from zmq.eventloop import ioloop
400 from zmq.eventloop.zmqstream import ZMQStream
402 from zmq.eventloop.zmqstream import ZMQStream
401
403
@@ -414,7 +416,15 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):"
414 nots.setsockopt(zmq.SUBSCRIBE, '')
416 nots.setsockopt(zmq.SUBSCRIBE, '')
415 nots.connect(not_addr)
417 nots.connect(not_addr)
416
418
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
419 # setup logging
420 if log_addr:
421 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
422 else:
423 local_logger(loglevel)
424
425 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
426 mon_stream=mons,notifier_stream=nots,
427 scheme=scheme,io_loop=loop)
418
428
419 loop.start()
429 loop.start()
420
430
@@ -15,6 +15,7 b' import os'
15 import sys
15 import sys
16 import time
16 import time
17 import traceback
17 import traceback
18 import logging
18 from datetime import datetime
19 from datetime import datetime
19 from signal import SIGTERM, SIGKILL
20 from signal import SIGTERM, SIGKILL
20 from pprint import pprint
21 from pprint import pprint
@@ -25,9 +26,8 b' from zmq.eventloop import ioloop, zmqstream'
25
26
26 # Local imports.
27 # Local imports.
27 from IPython.core import ultratb
28 from IPython.core import ultratb
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
31 from IPython.zmq.iostream import OutStream
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
32 from IPython.zmq.displayhook import DisplayHook
33
33
@@ -38,6 +38,8 b' from dependency import UnmetDependency'
38 import heartmonitor
38 import heartmonitor
39 from client import Client
39 from client import Client
40
40
41 logger = logging.getLogger()
42
41 def printer(*args):
43 def printer(*args):
42 pprint(args, stream=sys.__stdout__)
44 pprint(args, stream=sys.__stdout__)
43
45
@@ -51,8 +53,9 b' class Kernel(HasTraits):'
51 # Kernel interface
53 # Kernel interface
52 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
53
55
56 id = Int(-1)
54 session = Instance(StreamSession)
57 session = Instance(StreamSession)
55 shell_streams = Instance(list)
58 shell_streams = List()
56 control_stream = Instance(zmqstream.ZMQStream)
59 control_stream = Instance(zmqstream.ZMQStream)
57 task_stream = Instance(zmqstream.ZMQStream)
60 task_stream = Instance(zmqstream.ZMQStream)
58 iopub_stream = Instance(zmqstream.ZMQStream)
61 iopub_stream = Instance(zmqstream.ZMQStream)
@@ -62,7 +65,8 b' class Kernel(HasTraits):'
62 def __init__(self, **kwargs):
65 def __init__(self, **kwargs):
63 super(Kernel, self).__init__(**kwargs)
66 super(Kernel, self).__init__(**kwargs)
64 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
68 self.prefix = 'engine.%s'%self.id
69 logger.root_topic = self.prefix
66 self.user_ns = {}
70 self.user_ns = {}
67 self.history = []
71 self.history = []
68 self.compiler = CommandCompiler()
72 self.compiler = CommandCompiler()
@@ -108,8 +112,8 b' class Kernel(HasTraits):'
108
112
109 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
110 # msg = self.reply_socket.recv_json()
114 # msg = self.reply_socket.recv_json()
111 print ("Aborting:", file=sys.__stdout__)
115 logger.info("Aborting:")
112 print (Message(msg), file=sys.__stdout__)
116 logger.info(str(msg))
113 msg_type = msg['msg_type']
117 msg_type = msg['msg_type']
114 reply_type = msg_type.split('_')[0] + '_reply'
118 reply_type = msg_type.split('_')[0] + '_reply'
115 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -117,7 +121,7 b' class Kernel(HasTraits):'
117 # self.reply_socket.send_json(reply_msg)
121 # self.reply_socket.send_json(reply_msg)
118 reply_msg = self.session.send(stream, reply_type,
122 reply_msg = self.session.send(stream, reply_type,
119 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
120 print(Message(reply_msg), file=sys.__stdout__)
124 logger.debug(str(reply_msg))
121 # We need to wait a bit for requests to come in. This can probably
125 # We need to wait a bit for requests to come in. This can probably
122 # be set shorter for true asynchronous clients.
126 # be set shorter for true asynchronous clients.
123 time.sleep(0.05)
127 time.sleep(0.05)
@@ -135,7 +139,7 b' class Kernel(HasTraits):'
135 content = dict(status='ok')
139 content = dict(status='ok')
136 reply_msg = self.session.send(stream, 'abort_reply', content=content,
140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
137 parent=parent, ident=ident)[0]
141 parent=parent, ident=ident)[0]
138 print(Message(reply_msg), file=sys.__stdout__)
142 logger(Message(reply_msg), file=sys.__stdout__)
139
143
140 def shutdown_request(self, stream, ident, parent):
144 def shutdown_request(self, stream, ident, parent):
141 """kill ourself. This should really be handled in an external process"""
145 """kill ourself. This should really be handled in an external process"""
@@ -168,7 +172,7 b' class Kernel(HasTraits):'
168
172
169 handler = self.control_handlers.get(msg['msg_type'], None)
173 handler = self.control_handlers.get(msg['msg_type'], None)
170 if handler is None:
174 if handler is None:
171 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
172 else:
176 else:
173 handler(self.control_stream, idents, msg)
177 handler(self.control_stream, idents, msg)
174
178
@@ -211,13 +215,12 b' class Kernel(HasTraits):'
211 try:
215 try:
212 code = parent[u'content'][u'code']
216 code = parent[u'content'][u'code']
213 except:
217 except:
214 print("Got bad msg: ", file=sys.__stderr__)
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
215 print(Message(parent), file=sys.__stderr__)
216 return
219 return
217 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
218 # self.iopub_stream.send(pyin_msg)
221 # self.iopub_stream.send(pyin_msg)
219 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
223 ident='%s.pyin'%self.prefix)
221 started = datetime.now().strftime(ISO8601)
224 started = datetime.now().strftime(ISO8601)
222 try:
225 try:
223 comp_code = self.compiler(code, '<zmq-kernel>')
226 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -231,7 +234,7 b' class Kernel(HasTraits):'
231 exc_content = self._wrap_exception('execute')
234 exc_content = self._wrap_exception('execute')
232 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
235 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
233 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
236 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
237 ident='%s.pyerr'%self.prefix)
235 reply_content = exc_content
238 reply_content = exc_content
236 else:
239 else:
237 reply_content = {'status' : 'ok'}
240 reply_content = {'status' : 'ok'}
@@ -240,7 +243,7 b' class Kernel(HasTraits):'
240 # self.reply_socket.send_json(reply_msg)
243 # self.reply_socket.send_json(reply_msg)
241 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
242 ident=ident, subheader = dict(started=started))
245 ident=ident, subheader = dict(started=started))
243 print(Message(reply_msg), file=sys.__stdout__)
246 logger.debug(str(reply_msg))
244 if reply_msg['content']['status'] == u'error':
247 if reply_msg['content']['status'] == u'error':
245 self.abort_queues()
248 self.abort_queues()
246
249
@@ -262,8 +265,7 b' class Kernel(HasTraits):'
262 msg_id = parent['header']['msg_id']
265 msg_id = parent['header']['msg_id']
263 bound = content.get('bound', False)
266 bound = content.get('bound', False)
264 except:
267 except:
265 print("Got bad msg: ", file=sys.__stderr__)
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
266 print(Message(parent), file=sys.__stderr__)
267 return
269 return
268 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
269 # self.iopub_stream.send(pyin_msg)
271 # self.iopub_stream.send(pyin_msg)
@@ -316,7 +318,7 b' class Kernel(HasTraits):'
316 exc_content = self._wrap_exception('apply')
318 exc_content = self._wrap_exception('apply')
317 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
319 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
318 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
320 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
321 ident='%s.pyerr'%self.prefix)
320 reply_content = exc_content
322 reply_content = exc_content
321 result_buf = []
323 result_buf = []
322
324
@@ -354,7 +356,7 b' class Kernel(HasTraits):'
354 return
356 return
355 handler = self.shell_handlers.get(msg['msg_type'], None)
357 handler = self.shell_handlers.get(msg['msg_type'], None)
356 if handler is None:
358 if handler is None:
357 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
358 else:
360 else:
359 handler(stream, idents, msg)
361 handler(stream, idents, msg)
360
362
@@ -398,7 +400,7 b' class Kernel(HasTraits):'
398 # # don't busywait
400 # # don't busywait
399 # time.sleep(1e-3)
401 # time.sleep(1e-3)
400
402
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
403 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
402 client_addr=None, loop=None, context=None, key=None,
404 client_addr=None, loop=None, context=None, key=None,
403 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404
406
@@ -410,7 +412,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
410 c = context
412 c = context
411 session = StreamSession(key=key)
413 session = StreamSession(key=key)
412 # print (session.key)
414 # print (session.key)
413 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
415 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
414
416
415 # create Control Stream
417 # create Control Stream
416 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
418 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
@@ -433,12 +435,12 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
433 # Redirect input streams and set a display hook.
435 # Redirect input streams and set a display hook.
434 if out_stream_factory:
436 if out_stream_factory:
435 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
437 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
438 sys.stdout.topic = 'engine.%i.stdout'%int_id
437 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
439 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
440 sys.stderr.topic = 'engine.%i.stderr'%int_id
439 if display_hook_factory:
441 if display_hook_factory:
440 sys.displayhook = display_hook_factory(session, iopub_stream)
442 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
443 sys.displayhook.topic = 'engine.%i.pyout'%int_id
442
444
443
445
444 # launch heartbeat
446 # launch heartbeat
@@ -451,7 +453,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
451 else:
453 else:
452 client = None
454 client = None
453
455
454 kernel = Kernel(session=session, control_stream=control_stream,
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
455 shell_streams=shell_streams, iopub_stream=iopub_stream,
457 shell_streams=shell_streams, iopub_stream=iopub_stream,
456 client=client, loop=loop)
458 client=client, loop=loop)
457 kernel.start()
459 kernel.start()
@@ -1,4 +1,5 b''
1 """some generic utilities"""
1 """some generic utilities"""
2 import re
2
3
3 class ReverseDict(dict):
4 class ReverseDict(dict):
4 """simple double-keyed subset of dict methods."""
5 """simple double-keyed subset of dict methods."""
@@ -33,3 +34,46 b' class ReverseDict(dict):'
33 return default
34 return default
34
35
35
36
37 def validate_url(url):
38 """validate a url for zeromq"""
39 if not isinstance(url, basestring):
40 raise TypeError("url must be a string, not %r"%type(url))
41 url = url.lower()
42
43 proto_addr = url.split('://')
44 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 proto, addr = proto_addr
46 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47
48 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # author: Remi Sabourin
50 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51
52 if proto == 'tcp':
53 lis = addr.split(':')
54 assert len(lis) == 2, 'Invalid url: %r'%url
55 addr,s_port = lis
56 try:
57 port = int(s_port)
58 except ValueError:
59 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60
61 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62
63 else:
64 # only validate tcp urls currently
65 pass
66
67 return True
68
69
70 def validate_url_container(container):
71 """validate a potentially nested collection of urls."""
72 if isinstance(container, basestring):
73 url = container
74 return validate_url(url)
75 elif isinstance(container, dict):
76 container = container.itervalues()
77
78 for element in container:
79 validate_url_container(element) No newline at end of file
@@ -19,6 +19,7 b''
19 # You should have received a copy of the Lesser GNU General Public License
19 # You should have received a copy of the Lesser GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21
21
22 import sys
22 import zmq
23 import zmq
23 logport = 20202
24 logport = 20202
24 def main(topics, addrs):
25 def main(topics, addrs):
@@ -26,20 +27,27 b' def main(topics, addrs):'
26 context = zmq.Context()
27 context = zmq.Context()
27 socket = context.socket(zmq.SUB)
28 socket = context.socket(zmq.SUB)
28 for topic in topics:
29 for topic in topics:
30 print "Subscribing to: %r"%topic
29 socket.setsockopt(zmq.SUBSCRIBE, topic)
31 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 if addrs:
32 if addrs:
31 for addr in addrs:
33 for addr in addrs:
32 print "Connecting to: ", addr
34 print "Connecting to: ", addr
33 socket.connect(addr)
35 socket.connect(addr)
34 else:
36 else:
35 socket.bind('tcp://127.0.0.1:%i'%logport)
37 socket.bind('tcp://*:%i'%logport)
36
38
37 while True:
39 while True:
38 # topic = socket.recv()
40 # topic = socket.recv()
39 # print topic
41 # print topic
40 topic, msg = socket.recv_multipart()
42 # print 'tic'
41 # msg = socket.recv_pyobj()
43 raw = socket.recv_multipart()
44 if len(raw) != 2:
45 print "!!! invalid log message: %s"%raw
46 else:
47 topic, msg = raw
48 # don't newline, since log messages always newline:
42 print "%s | %s " % (topic, msg),
49 print "%s | %s" % (topic, msg),
50 sys.stdout.flush()
43
51
44 if __name__ == '__main__':
52 if __name__ == '__main__':
45 import sys
53 import sys
General Comments 0
You need to be logged in to leave comments. Login now